Browse Source

En cours de développement

master
losyme 3 months ago
parent
commit
6c8d692fc5
  1. 215
      <
  2. 33
      workers/worker.go

215
<

@ -1,215 +0,0 @@
/*
------------------------------------------------------------------------------------------------------------------------
####### sdk ####### Copyright (c) 2021-2022 losyme ################################################# MIT License #######
------------------------------------------------------------------------------------------------------------------------
*/
package client
import (
"context"
"net/http"
"forge.chapril.org/dune/jw"
"forge.chapril.org/losyme/breaker"
"forge.chapril.org/losyme/errors"
"forge.chapril.org/losyme/king"
"forge.chapril.org/losyme/king/request"
_kong "forge.chapril.org/losyme/kong/context"
)
const (
_errRetry = errors.Sentinel("retry")
_errJobCreated = errors.Sentinel("job created")
_errWorkflowCreated = errors.Sentinel("workflow created")
ErrJobRunning = errors.Sentinel("job running")
ErrJobUpdated = errors.Sentinel("job updated")
)
type endpoint struct {
*Endpoint
breaker *breaker.Breaker
client *http.Client
}
func (ep *endpoint) isReady() bool {
return ep.breaker.IsReady()
}
func (ep *endpoint) update(success bool) {
ep.breaker.Update(success)
}
func (ep *endpoint) afterDoRequest(err error) error {
if err == nil {
ep.update(true)
return nil
}
ep.update(false)
return errors.Wrap(_errRetry, err)
}
func onError(resp *http.Response) error {
respErr := new(_kong.Error)
if err := king.DecodeJSON(resp, respErr); err != nil {
return errors.New(http.StatusText(resp.StatusCode), "status", resp.StatusCode) /////////////////////////////////
}
return respErr
}
func (ep *endpoint) createJob(reqID string, content []byte) (*jw.Job, error) {
var job *jw.Job
err := request.
New(ep.URL).
Path("api/jobs").
Post().
BasicAuth(ep.Username, ep.Password).
ContentTypeJSON().
ID(reqID).
BodyData(content).
Client(ep.client).
Execute(
context.TODO(), // @FIXME
ep.afterDoRequest,
func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusNoContent:
return nil
case http.StatusCreated:
to := new(jw.Job)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.Wrap(_errJobCreated, err) ////////////////////////////////////////////////////////
}
job = to
return nil
}
return onError(resp)
},
)
return job, err
}
func (ep *endpoint) nextJob(reqID, namespace string) (*jw.Job, error) {
var job *jw.Job
err := request.
New(ep.URL).
Path("api/jobs/next/"+namespace).
Get().
BasicAuth(ep.Username, ep.Password).
ID(reqID).
Client(ep.client).
Execute(
context.TODO(), // @FIXME
ep.afterDoRequest,
func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusNoContent:
return nil
case http.StatusOK:
to := new(jw.Job)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.Wrap(ErrJobRunning, err) /////////////////////////////////////////////////////////
}
job = to
return nil
}
return onError(resp)
},
)
return job, err
}
func (ep *endpoint) updateJob(reqID string, content []byte) (*jw.Job, error) {
var job *jw.Job
err := request.
New(ep.URL).
Path("api/jobs").
Put().
BasicAuth(ep.Username, ep.Password).
ContentTypeJSON().
ID(reqID).
BodyData(content).
Client(ep.client).
Execute(
context.TODO(), // @FIXME
ep.afterDoRequest,
func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusOK:
to := new(jw.Job)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.Wrap(ErrJobUpdated, err) /////////////////////////////////////////////////////////
}
job = to
return nil
}
return onError(resp)
},
)
return job, err
}
func (ep *endpoint) createWorkflow(reqID string, content []byte) (*jw.Workflow, error) {
var wf *jw.Workflow
err := request.
New(ep.URL).
Path("api/workflows").
Post().
BasicAuth(ep.Username, ep.Password).
ContentTypeJSON().
ID(reqID).
BodyData(content).
Client(ep.client).
Execute(
context.TODO(), // @FIXME
ep.afterDoRequest,
func(resp *http.Response) error {
switch resp.StatusCode {
case http.StatusCreated:
to := new(jw.Workflow)
if err := king.DecodeJSON(resp, to); err != nil {
return errors.Wrap(_errWorkflowCreated, err) ///////////////////////////////////////////////////
}
wf = to
return nil
}
return onError(resp)
},
)
return wf, err
}
/*
######################################################################################################## @(°_°)@ #######
*/

33
workers/worker.go

@ -50,23 +50,28 @@ func (w *worker) updateJob(job *jw.Job) {
job.Result.Host = w.host
job.Result.Worker = w.id
jobID := job.ID
job, err := w.model.UpdateJob(job)
if logger := w.logger; logger != nil {
if err == nil {
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"job", job.ID,
"status", job.Status,
"run_after", job.RunAfter.String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts),
)
} else {
logger.Info("END", "job", job.ID, "status", job.Status) //::::::::::::::::::::::::::::::::::::::::::::::
}
} else {
logger.Error("END", "job", job.ID, nil, err) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
if err != nil {
logger.Error("END", "job", jobID, nil, err) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
return
}
if job.Status == jw.StatusPending {
logger.Info( //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
"END",
"job", job.ID,
"status", job.Status,
"run_after", job.RunAfter.String(),
"attempt", fmt.Sprintf("%d/%d", job.Attempt, job.MaxAttempts),
)
return
}
logger.Info("END", "job", job.ID, "status", job.Status) //::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
}

Loading…
Cancel
Save