refactor server code for better testing

This commit is contained in:
Alexander Baryshnikov 2020-09-17 20:07:34 +08:00
parent de675ba718
commit 31e20b242e
2 changed files with 61 additions and 25 deletions

View File

@ -60,6 +60,7 @@ func getTask(wrk *worker.Worker) http.HandlerFunc {
http.Error(writer, "encoding", http.StatusInternalServerError)
return
}
writer.Header().Set("X-Correlation-Id", requestID)
writer.Header().Set("Content-Type", "application/json")
writer.Header().Set("Content-Length", strconv.Itoa(len(data)))
writer.Header().Set("Content-Version", strconv.Itoa(len(info.Attempts)))

View File

@ -8,10 +8,11 @@ import (
"net/http"
"os"
"path/filepath"
"sync"
"time"
"gopkg.in/yaml.v2"
"nano-run/worker"
)
type Config struct {
@ -74,58 +75,62 @@ func (cfg Config) SaveFile(file string) error {
return ioutil.WriteFile(file, data, 0600)
}
func (cfg Config) Run(global context.Context) error {
func (cfg Config) Create(global context.Context) (*Server, error) {
units, err := Units(cfg.ConfigDirectory)
if err != nil {
return err
return nil, err
}
workers, err := Workers(cfg.WorkingDirectory, units)
if err != nil {
return err
return nil, err
}
defer func() {
for _, wrk := range workers {
wrk.Close()
}
}()
handler := Handler(units, workers)
ctx, cancel := context.WithCancel(global)
srv := &Server{
Handler: Handler(units, workers),
workers: workers,
units: units,
done: make(chan struct{}),
cancel: cancel,
}
go srv.run(ctx)
return srv, nil
}
func (cfg Config) Run(global context.Context) error {
ctx, cancel := context.WithCancel(global)
defer cancel()
srv, err := cfg.Create(global)
if err != nil {
return err
}
defer srv.Close()
server := http.Server{
Addr: cfg.Bind,
Handler: handler,
Handler: srv,
}
var wg sync.WaitGroup
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
<-ctx.Done()
t, c := context.WithTimeout(context.Background(), cfg.GracefulShutdown)
_ = server.Shutdown(t)
c()
close(done)
}()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
err := Run(ctx, workers)
if err != nil {
log.Println("workers stopped:", err)
}
}()
if cfg.TLS.Enable {
err = server.ListenAndServeTLS(cfg.TLS.Cert, cfg.TLS.Key)
} else {
err = server.ListenAndServe()
}
cancel()
wg.Wait()
return err
<-done
return ctx.Err()
}
func limitRequest(maxSize int64, handler http.Handler) http.Handler {
@ -142,3 +147,33 @@ func limitRequest(maxSize int64, handler http.Handler) http.Handler {
handler.ServeHTTP(writer, request)
})
}
type Server struct {
http.Handler
workers []*worker.Worker
units []Unit
cancel func()
done chan struct{}
err error
}
func (srv *Server) Close() {
for _, wrk := range srv.workers {
wrk.Close()
}
srv.cancel()
<-srv.done
}
func (srv *Server) Err() error {
return srv.err
}
func (srv *Server) run(ctx context.Context) {
err := Run(ctx, srv.workers)
if err != nil {
log.Println("workers stopped:", err)
}
srv.err = err
close(srv.done)
}