You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
196 lines
4.4 KiB
196 lines
4.4 KiB
/* |
|
------------------------------------------------------------------------------------------------------------------------ |
|
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License ####### |
|
------------------------------------------------------------------------------------------------------------------------ |
|
*/ |
|
|
|
package workers |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"runtime/debug" |
|
"time" |
|
|
|
"forge.chapril.org/dune/jw" |
|
"forge.chapril.org/dune/sdk" |
|
"forge.chapril.org/losyme/uuid" |
|
) |
|
|
|
const ( |
|
_timeoutOnError = 9 * time.Second |
|
_timeoutNoJob = 2 * time.Second |
|
) |
|
|
|
type worker struct { |
|
id string |
|
host string |
|
model sdk.Model |
|
runner sdk.Runner |
|
logger Logger |
|
state *state |
|
} |
|
|
|
func newWorker(host string, model sdk.Model, runner sdk.Runner, logger Logger, state *state) *worker { |
|
return &worker{ |
|
id: uuid.New(), |
|
host: host, |
|
model: model, |
|
runner: runner, |
|
logger: logger, |
|
state: state, |
|
} |
|
} |
|
|
|
func (w *worker) updateJob(job *jw.Job) { |
|
if job.Result == nil { |
|
job.Succeeded() |
|
} |
|
|
|
job.Result.Host = w.host |
|
job.Result.Worker = w.id |
|
|
|
jobID := job.ID |
|
|
|
job, err := w.model.UpdateJob(job) |
|
if logger := w.logger; logger != nil { |
|
if err != nil { |
|
logger.Error( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"END", |
|
"job", jobID, |
|
"workflow", job.WorkflowValue(), |
|
nil, err, |
|
) |
|
return |
|
} |
|
|
|
if job.Status == jw.StatusPending { |
|
logger.Info( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"END", |
|
"job", job.ID, |
|
"workflow", job.WorkflowValue(), |
|
"status", job.Status, |
|
"run_after", job.RunAfter.String(), |
|
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts), |
|
) |
|
|
|
return |
|
} |
|
|
|
logger.Info( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"END", |
|
"job", job.ID, |
|
"workflow", job.WorkflowValue(), |
|
"status", job.Status, |
|
) |
|
} |
|
} |
|
|
|
func (w *worker) runJob(job *jw.Job) { |
|
defer func() { |
|
if data := recover(); data != nil { |
|
if w.logger != nil { |
|
w.logger.Error( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"PANIC ERROR RECOVERED", |
|
"job", job.ID, |
|
"workflow", job.WorkflowValue(), |
|
"data", data, |
|
"stack", string(debug.Stack()), |
|
) |
|
} |
|
|
|
job.Failed().SetError(fmt.Sprintf("PANIC ERROR RECOVERED: %v", data)) |
|
} |
|
}() |
|
|
|
if err := w.runner.Run(job); err != nil { |
|
result := new(jw.Result) |
|
|
|
if errors.As(err, &result) { |
|
job.Result = result |
|
} else { |
|
job.Failed().SetError(err.Error()) |
|
} |
|
|
|
if w.logger != nil { |
|
w.logger.Error( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"RUN", |
|
"job", job.ID, |
|
"workflow", job.WorkflowValue(), |
|
nil, err, |
|
) |
|
} |
|
} |
|
} |
|
|
|
func (w *worker) maybeRunJob() time.Duration { |
|
job, err := w.model.NextJob(w.runner.Namespace()) |
|
if err != nil { |
|
return _timeoutOnError |
|
} |
|
|
|
if job == nil { |
|
return _timeoutNoJob |
|
} |
|
|
|
count := w.state.addJob(w.id, job) |
|
defer w.state.removeJob(w.id) |
|
|
|
if logger := w.logger; logger != nil { |
|
logger.Info("JOB", job.Fields()...) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
"Worker", |
|
"id", w.id, |
|
"job", job.ID, |
|
"workflow", job.WorkflowValue(), |
|
"count", count, |
|
) |
|
} |
|
|
|
w.runJob(job) |
|
w.updateJob(job) |
|
|
|
return 0 |
|
} |
|
|
|
func (w *worker) loop(ctx context.Context, stopCh <-chan bool) { |
|
timer := time.NewTimer(0) |
|
|
|
defer func() { |
|
if !timer.Stop() { |
|
<-timer.C |
|
} |
|
}() |
|
|
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
return |
|
case <-stopCh: |
|
return |
|
case <-timer.C: |
|
timer.Reset(w.maybeRunJob()) |
|
} |
|
} |
|
} |
|
|
|
func (w *worker) run(ctx context.Context, stopCh <-chan bool, stoppedCh chan<- string) { |
|
if w.logger != nil { |
|
w.logger.Info("Worker started", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
} |
|
|
|
w.state.addWorker(w.id) |
|
w.loop(ctx, stopCh) |
|
w.state.removeWorker(w.id) |
|
|
|
if w.logger != nil { |
|
w.logger.Info("Worker stopped", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: |
|
} |
|
|
|
stoppedCh <- w.id |
|
} |
|
|
|
/* |
|
######################################################################################################## @(°_°)@ ####### |
|
*/
|
|
|