Browse Source

En cours de développement

master
losyme 7 months ago
parent
commit
eca76d42e2
  1. 4
      workers/dashboard.go
  2. 58
      workers/worker.go

4
workers/dashboard.go

@ -33,7 +33,7 @@ func newDashboard() *dashboard {
}
}
func (d *dashboard) addJob(wID string, job *jw.Job) {
func (d *dashboard) addJob(wID string, job *jw.Job) int {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -44,6 +44,8 @@ func (d *dashboard) addJob(wID string, job *jw.Job) {
w.JobNamespace = job.Namespace
w.JobType = job.Type
w.StartedAt = time.Now()
return w.Jobs
}
func (d *dashboard) removeJob(wID string) {

58
workers/worker.go

@ -57,13 +57,34 @@ func (w *worker) cloneLogger(job *jw.Job) (Logger, error) {
return w.logger.Clone(id, name)
}
func (w *worker) maybeRunJob() (duration time.Duration) {
func (w *worker) updateJob(job *jw.Job, logger Logger) {
if job.Result == nil {
job.Succeeded()
}
job.Result.Host = w.host
job.Result.Worker = w.id
if job, err := w.runner.Model().UpdateJob(job); err == nil {
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"status", job.Status,
"run_after", job.RunAfter.Round(time.Second).String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxRetries),
)
} else {
logger.Info("END", "status", job.Status) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
} else {
logger.Info("END", "model", err) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
}
func (w *worker) maybeRunJob() time.Duration {
defer func() {
if data := recover(); data != nil {
w.logger.Fatal("PANIC ERROR RECOVERED", "data", data) //::::::::::::::::::::::::::::::::::::::::::::::::::::
duration = 5 * time.Second
w.stop()
}
}()
@ -74,10 +95,10 @@ func (w *worker) maybeRunJob() (duration time.Duration) {
}
if job == nil {
return 500 * time.Millisecond
return 1 * time.Second
}
w.dashboard.addJob(w.id, job)
jobs := w.dashboard.addJob(w.id, job)
defer w.dashboard.removeJob(w.id)
logger, err := w.cloneLogger(job)
@ -93,32 +114,11 @@ func (w *worker) maybeRunJob() (duration time.Duration) {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"Worker",
"id", w.id,
//"jobs", w.jobs, AFINIR
"jobs", jobs,
)
w.runner.RunJob(job, logger)
if job.Result == nil {
job.Succeeded()
}
job.Result.Host = w.host
job.Result.Worker = w.id
if job, err := w.runner.Model().UpdateJob(job); err == nil {
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"status", job.Status,
"run_after", job.RunAfter.Round(time.Second).String(),
"retries", fmt.Sprintf("%d/%d", job.Attempt, job.MaxRetries),
)
} else {
logger.Info("END", "status", job.Status) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
} else {
logger.Info("END", "model", err) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
w.updateJob(job, logger)
return 0
}

Loading…
Cancel
Save