Browse Source

En cours de développement

master
losyme 5 months ago
parent
commit
c4afe67f45
  1. 2
      client/client.go
  2. 29
      workers/worker.go

2
client/client.go

@ -112,7 +112,7 @@ func (c *Client) NextJob(namespace string) (*jw.Job, error) {
}
func (c *Client) UpdateJob(job *jw.Job) (*jw.Job, error) {
return nil, errors.NotImplemented()
return job, nil
}
func (c *Client) CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error) {

29
workers/worker.go

@ -8,6 +8,7 @@ package workers
import (
"context"
"fmt"
"runtime/debug"
"time"
@ -42,6 +43,30 @@ func newWorker(host string, model sdk.Model, runner sdk.Runner, logger Logger, s
}
func (w *worker) updateJob(job *jw.Job) {
if job.Result == nil {
job.Succeeded()
}
job.Result.Host = w.host
job.Result.Worker = w.id
job, err := w.model.UpdateJob(job)
if logger := w.logger; logger != nil {
if err == nil {
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"status", job.Status,
"run_after", job.RunAfter.String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts),
)
} else {
logger.Info("END", "status", job.Status) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
} else {
logger.Info("END", "model", err) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
}
func (w *worker) maybeRunJob() time.Duration {
@ -62,7 +87,7 @@ func (w *worker) maybeRunJob() time.Duration {
return _timeoutNoJob
}
number := w.state.addJob(w.id, job)
count := w.state.addJob(w.id, job)
defer w.state.removeJob(w.id)
if logger := w.logger; logger != nil {
@ -71,7 +96,7 @@ func (w *worker) maybeRunJob() time.Duration {
"Worker",
"id", w.id,
"job", job.ID,
"number", number,
"number", count,
)
}

Loading…
Cancel
Save