diff --git a/client/client.go b/client/client.go index 13a6363..89e780d 100644 --- a/client/client.go +++ b/client/client.go @@ -112,7 +112,7 @@ func (c *Client) NextJob(namespace string) (*jw.Job, error) { } func (c *Client) UpdateJob(job *jw.Job) (*jw.Job, error) { - return nil, errors.NotImplemented() + return job, nil } func (c *Client) CreateWorkflow(wf *jw.Workflow) (*jw.Workflow, error) { diff --git a/workers/worker.go b/workers/worker.go index 486f8fc..7189f85 100644 --- a/workers/worker.go +++ b/workers/worker.go @@ -8,6 +8,7 @@ package workers import ( "context" + "fmt" "runtime/debug" "time" @@ -42,6 +43,30 @@ func newWorker(host string, model sdk.Model, runner sdk.Runner, logger Logger, s } func (w *worker) updateJob(job *jw.Job) { + if job.Result == nil { + job.Succeeded() + } + + job.Result.Host = w.host + job.Result.Worker = w.id + + job, err := w.model.UpdateJob(job) + if logger := w.logger; logger != nil { + if err == nil { + if job.Status == jw.StatusPending { + logger.Info( //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: + "END", + "status", job.Status, + "run_after", job.RunAfter.String(), + "attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts), + ) + } else { + logger.Info("END", "status", job.Status) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: + } + } else { + logger.Info("END", "model", err) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::: + } + } } func (w *worker) maybeRunJob() time.Duration { @@ -62,7 +87,7 @@ func (w *worker) maybeRunJob() time.Duration { return _timeoutNoJob } - number := w.state.addJob(w.id, job) + count := w.state.addJob(w.id, job) defer w.state.removeJob(w.id) if logger := w.logger; logger != nil { @@ -71,7 +96,7 @@ func (w *worker) maybeRunJob() time.Duration { "Worker", "id", w.id, "job", job.ID, - "number", number, + "number", count, ) }