Browse Source

En cours de développement

master
losyme 5 months ago
parent
commit
84b26b563a
  1. 50
      workers/supervisor.go
  2. 4
      workers/worker.go

50
workers/supervisor.go

@ -7,8 +7,6 @@
package workers
import (
"context"
"forge.chapril.org/losyme/zombie"
)
@ -17,7 +15,7 @@ type supervisor struct {
logger Logger
pool *pool
eventCh chan string
zombie *zombie.GroupWithContext
zombie *zombie.Group
}
func newSupervisor(poolSize int, logger Logger, pool *pool) *supervisor {
@ -25,11 +23,10 @@ func newSupervisor(poolSize int, logger Logger, pool *pool) *supervisor {
poolSize: poolSize,
logger: logger,
pool: pool,
eventCh: make(chan string),
}
}
func (s *supervisor) loop(ctx context.Context) {
func (s *supervisor) loop() {
end := false
for {
@ -45,35 +42,35 @@ func (s *supervisor) loop(ctx context.Context) {
}
}
select {
case <-ctx.Done():
e := <-s.eventCh
switch e {
case "stop":
if !end {
end = true
s.pool.stop()
}
case e := <-s.eventCh:
switch e {
case "-1":
if !end && s.poolSize > 0 {
s.poolSize--
}
case "+1":
if !end && s.poolSize < _maxPoolSize {
s.poolSize++
}
default: // wID
size := s.pool.workerStopped(e, end)
if end && size == 0 {
return
}
case "-1":
if !end && s.poolSize > 0 {
s.poolSize--
}
case "+1":
if !end && s.poolSize < _maxPoolSize {
s.poolSize++
}
default: // wID
size := s.pool.workerStopped(e, end)
if end && size == 0 {
return
}
}
}
}
func (s *supervisor) start() {
s.zombie = zombie.GoWithContext(
context.Background(),
s.eventCh = make(chan string, 1)
s.zombie = zombie.Go(
1,
s.loop,
zombie.WithName("supervisor"),
@ -90,9 +87,12 @@ func (s *supervisor) stopOneWorker() {
}
func (s *supervisor) stop() {
s.zombie.Stop()
s.eventCh <- "stop"
s.pool.wait()
s.zombie.Wait()
close(s.eventCh)
}
/*

4
workers/worker.go

@ -59,7 +59,7 @@ func (w *worker) loop(ctx context.Context, stopCh <-chan bool) {
func (w *worker) run(ctx context.Context, stopCh <-chan bool, stoppedCh chan<- string) {
if w.logger != nil {
w.logger.Info("Started", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
w.logger.Info("Worker started", "id", w.id) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
w.state.addWorker(w.id)
@ -67,7 +67,7 @@ func (w *worker) run(ctx context.Context, stopCh <-chan bool, stoppedCh chan<- s
w.state.removeWorker(w.id)
if w.logger != nil {
w.logger.Info("Stopped", "id", w.id) //:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
w.logger.Info("Worker stopped", "id", w.id) //::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
}
stoppedCh <- w.id

Loading…
Cancel
Save