add context to scheduler

This commit is contained in:
Alexander Baryshnikov 2020-11-07 23:07:54 +08:00
parent 6fb7d88306
commit b397ec0b9d
2 changed files with 12 additions and 6 deletions

View File

@ -2,6 +2,7 @@ package server
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -35,7 +36,7 @@ func (cs *CronSpec) Label(def string) string {
return def return def
} }
func (cs *CronSpec) Request(requestPath string) (*http.Request, error) { func (cs *CronSpec) Request(ctx context.Context, requestPath string) (*http.Request, error) {
var src io.ReadCloser var src io.ReadCloser
if cs.Content != "" || cs.ContentFile == "" { if cs.Content != "" || cs.ContentFile == "" {
src = ioutil.NopCloser(bytes.NewReader([]byte(cs.Content))) src = ioutil.NopCloser(bytes.NewReader([]byte(cs.Content)))
@ -45,7 +46,7 @@ func (cs *CronSpec) Request(requestPath string) (*http.Request, error) {
src = f src = f
} }
req, err := http.NewRequest(http.MethodPost, requestPath, src) req, err := http.NewRequestWithContext(ctx, http.MethodPost, requestPath, src)
if err != nil { if err != nil {
_ = src.Close() _ = src.Close()
} }
@ -58,12 +59,13 @@ type CronEntry struct {
Worker *worker.Worker Worker *worker.Worker
Config Unit Config Unit
ID cron.EntryID ID cron.EntryID
ctx context.Context
} }
func (ce *CronEntry) Unit() Unit { return ce.Config } func (ce *CronEntry) Unit() Unit { return ce.Config }
// Cron initializes cron engine and registers all required worker schedules to it. // Cron initializes cron engine and registers all required worker schedules to it.
func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) { func Cron(ctx context.Context, workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) {
engine := cron.New() engine := cron.New()
var entries []*CronEntry var entries []*CronEntry
for i, wrk := range workers { for i, wrk := range workers {
@ -75,6 +77,7 @@ func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, e
Worker: wrk, Worker: wrk,
Config: cfg, Config: cfg,
Name: name, Name: name,
ctx: ctx,
} }
id, err := engine.AddJob(cronSpec.Spec, entry) id, err := engine.AddJob(cronSpec.Spec, entry)
if err != nil { if err != nil {
@ -88,7 +91,7 @@ func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, e
} }
func (ce *CronEntry) Run() { func (ce *CronEntry) Run() {
req, err := ce.Spec.Request(ce.Config.Path()) req, err := ce.Spec.Request(ce.ctx, ce.Config.Path())
if err != nil { if err != nil {
log.Println("failed create cron", ce.Name, "request:", err) log.Println("failed create cron", ce.Name, "request:", err)
return return

View File

@ -98,12 +98,15 @@ func (cfg Config) Create(global context.Context) (*Server, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
cronEntries, cronEngine, err := server.Cron(workers, units)
ctx, cancel := context.WithCancel(global)
cronEntries, cronEngine, err := server.Cron(ctx, workers, units)
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(global)
router := gin.Default() router := gin.Default()
router.Use(func(gctx *gin.Context) { router.Use(func(gctx *gin.Context) {
gctx.Request = gctx.Request.WithContext(global) gctx.Request = gctx.Request.WithContext(global)