From b397ec0b9dadef66ffa001b7a2de3fd19de1dda7 Mon Sep 17 00:00:00 2001 From: Alexander Baryshnikov Date: Sat, 7 Nov 2020 23:07:54 +0800 Subject: [PATCH] add context to scheduler --- server/cron.go | 11 +++++++---- server/runner/handler.go | 7 +++++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/server/cron.go b/server/cron.go index 9d7ece6..bbba820 100644 --- a/server/cron.go +++ b/server/cron.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -35,7 +36,7 @@ func (cs *CronSpec) Label(def string) string { return def } -func (cs *CronSpec) Request(requestPath string) (*http.Request, error) { +func (cs *CronSpec) Request(ctx context.Context, requestPath string) (*http.Request, error) { var src io.ReadCloser if cs.Content != "" || cs.ContentFile == "" { src = ioutil.NopCloser(bytes.NewReader([]byte(cs.Content))) @@ -45,7 +46,7 @@ func (cs *CronSpec) Request(requestPath string) (*http.Request, error) { src = f } - req, err := http.NewRequest(http.MethodPost, requestPath, src) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, requestPath, src) if err != nil { _ = src.Close() } @@ -58,12 +59,13 @@ type CronEntry struct { Worker *worker.Worker Config Unit ID cron.EntryID + ctx context.Context } func (ce *CronEntry) Unit() Unit { return ce.Config } // Cron initializes cron engine and registers all required worker schedules to it. -func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) { +func Cron(ctx context.Context, workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) { engine := cron.New() var entries []*CronEntry for i, wrk := range workers { @@ -75,6 +77,7 @@ func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, e Worker: wrk, Config: cfg, Name: name, + ctx: ctx, } id, err := engine.AddJob(cronSpec.Spec, entry) if err != nil { @@ -88,7 +91,7 @@ func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, e } func (ce *CronEntry) Run() { - req, err := ce.Spec.Request(ce.Config.Path()) + req, err := ce.Spec.Request(ce.ctx, ce.Config.Path()) if err != nil { log.Println("failed create cron", ce.Name, "request:", err) return diff --git a/server/runner/handler.go b/server/runner/handler.go index 257d555..5aa0719 100644 --- a/server/runner/handler.go +++ b/server/runner/handler.go @@ -98,12 +98,15 @@ func (cfg Config) Create(global context.Context) (*Server, error) { if err != nil { return nil, err } - cronEntries, cronEngine, err := server.Cron(workers, units) + + ctx, cancel := context.WithCancel(global) + + cronEntries, cronEngine, err := server.Cron(ctx, workers, units) if err != nil { + cancel() return nil, err } - ctx, cancel := context.WithCancel(global) router := gin.Default() router.Use(func(gctx *gin.Context) { gctx.Request = gctx.Request.WithContext(global)