diff --git a/_docs/cron.md b/_docs/cron.md new file mode 100644 index 0000000..27a7b4e --- /dev/null +++ b/_docs/cron.md @@ -0,0 +1,41 @@ +# Cron + + +Cron-like jobs allowed as part of Unit definition [thanks to robfig/cron](https://godoc.org/github.com/robfig/cron#hdr-Usage). + +**Example definition:** + +```yaml +# ... unit definition above ... +cron: + # every hour on the half hour + - spec: 30 * * * * + # same as above but with name to detect in UI and logs + - spec: 30 * * * * + name: named schedule + # each hour with custom payload and headers + - spec: @hourly + content: | + hello world + headers: + X-Some-Header: test-header + # each day with content from file + - spec: @daily + content_file: /path/to/content +``` + + +Schema: + +* `spec` (required, string) - cron tab specification for the time interval. See [online builder](https://crontab.guru/) +* `name` (optional, string) - name for entry to distinguish record in UI or in logs. +* `headers` (optional, map string=>string) - headers that will be used in simulated request. +* `content` (optional, string) - simulated request content. +* `content_file` (optional, string) - simulated request content file. Has less priority than `content`. + +Caveats: + +* **Security:** cron job ignores authorizations defined on unit level. +* **Enqueuing:** cron job will be enqueued regardless of status of previous job. +* **Errors:** if cron job can't create request (ex: `content_file` not available) - it will print error to log and +will try again later at the next schedule. \ No newline at end of file diff --git a/_docs/unit.md b/_docs/unit.md index da58894..5430a9b 100644 --- a/_docs/unit.md +++ b/_docs/unit.md @@ -1,3 +1,21 @@ # Unit -* If work dir is not defined - temporary directory will be created and removed after execution for each request automatically. \ No newline at end of file +* If work dir not defined - temporary directory will be created and removed after execution for each request automatically. + + +Schema: + +* `command` (required, string) - command to execute (will be executed in a shell) +* `interval` (optional, interval) - interval between attempts +* `timeout` (optional, interval) - maximum execution timeout (enabled only for bin mode and only if positive) +* `graceful_timeout` (optional, interval) - maximum execution timeout after which SIGINT will be sent (enabled only for bin mode and only if positive). +Ie: how long to let command react on SIGTERM signal. +* `shell` (optional, string) - shell to execute command in bin mode (default - /bin/sh) +* `environment` (optional, map string=>string) - custom environment for executable (in addition to system) +* `max_request` (optional, integer) - maximum HTTP body size (enabled if positive) +* `attempts` (optional, integer) - maximum number of attempts +* `workers` (optional, integer) - concurrency level - number of parallel requests +* `mode` (optional, string) - execution mode: `bin` or `cgi` +* `workdir` (optional, string) - working directory for the worker. if empty - temporary one will be generated automatically. +* `authorization` (optional, [Authorization](authorization.md)) - request authorization +* `cron` (optional,[Cron](cron.md)) - scheduled requests \ No newline at end of file diff --git a/go.mod b/go.mod index 7d60a40..c2a47f9 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/imdario/mergo v0.3.11 // indirect github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7 github.com/mitchellh/copystructure v1.0.0 // indirect + github.com/robfig/cron/v3 v3.0.0 github.com/stretchr/testify v1.4.0 golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 diff --git a/go.sum b/go.sum index d3b8be6..161701b 100644 --- a/go.sum +++ b/go.sum @@ -184,6 +184,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/server/cron.go b/server/cron.go new file mode 100644 index 0000000..39726df --- /dev/null +++ b/server/cron.go @@ -0,0 +1,100 @@ +package server + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "strconv" + + "github.com/robfig/cron/v3" + + "nano-run/worker" +) + +type CronSpec struct { + Spec string `yaml:"spec"` // cron tab spec with seconds precision + Name string `yaml:"name,omitempty"` // optional name to distinguish in logs and ui + Headers map[string]string `yaml:"headers,omitempty"` // headers in simulated request + Content string `yaml:"content,omitempty"` // content in simulated request + ContentFile string `yaml:"content_file,omitempty"` // content file to read for request content +} + +func (cs CronSpec) Validate() error { + _, err := cron.ParseStandard(cs.Spec) + return err +} + +func (cs *CronSpec) Label(def string) string { + if cs.Name != "" { + return cs.Name + } + return def +} + +func (cs *CronSpec) Request(requestPath string) (*http.Request, error) { + var src io.ReadCloser + if cs.Content != "" || cs.ContentFile == "" { + src = ioutil.NopCloser(bytes.NewReader([]byte(cs.Content))) + } else if f, err := os.Open(cs.ContentFile); err != nil { + return nil, err + } else { + src = f + } + + req, err := http.NewRequest(http.MethodPost, requestPath, src) + if err != nil { + _ = src.Close() + } + return req, err +} + +type CronEntry struct { + Name string + Spec CronSpec + Worker *worker.Worker + Config Unit + ID cron.EntryID +} + +// Cron initializes cron engine and registers all required worker schedules to it. +func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) { + engine := cron.New() + var entries []*CronEntry + for i, wrk := range workers { + cfg := configs[i] + for i, cronSpec := range cfg.Cron { + name := cfg.Name() + "/" + cronSpec.Label(strconv.Itoa(i)) + entry := &CronEntry{ + Spec: cronSpec, + Worker: wrk, + Config: cfg, + Name: name, + } + id, err := engine.AddJob(cronSpec.Spec, entry) + if err != nil { + return nil, nil, fmt.Errorf("cron record %s: %w", name, err) + } + entry.ID = id + entries = append(entries, entry) + } + } + return entries, engine, nil +} + +func (ce *CronEntry) Run() { + req, err := ce.Spec.Request(ce.Config.Path()) + if err != nil { + log.Println("failed create cron", ce.Name, "request:", err) + return + } + id, err := ce.Worker.Enqueue(req) + if err != nil { + log.Println("failed enqueue cron", ce.Name, "job:", err) + return + } + log.Println("enqueued cron", ce.Name, "job with id", id) +} diff --git a/server/runner/handler.go b/server/runner/handler.go index 05abd18..8c5c869 100644 --- a/server/runner/handler.go +++ b/server/runner/handler.go @@ -12,6 +12,7 @@ import ( "github.com/Masterminds/sprig" "github.com/gin-gonic/gin" + "github.com/robfig/cron/v3" "gopkg.in/yaml.v2" "nano-run/server" @@ -97,6 +98,11 @@ func (cfg Config) Create(global context.Context) (*Server, error) { if err != nil { return nil, err } + cronEntries, cronEngine, err := server.Cron(workers, units) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(global) router := gin.Default() router.Use(func(gctx *gin.Context) { @@ -107,11 +113,13 @@ func (cfg Config) Create(global context.Context) (*Server, error) { server.Attach(router.Group("/api/"), units, workers) srv := &Server{ - Handler: router, - workers: workers, - units: units, - done: make(chan struct{}), - cancel: cancel, + Handler: router, + workers: workers, + cronEngine: cronEngine, + cronEntries: cronEntries, + units: units, + done: make(chan struct{}), + cancel: cancel, } go srv.run(ctx) return srv, nil @@ -191,20 +199,25 @@ func (cfg Config) useEmbeddedUI(router *gin.Engine, uiGroup gin.IRouter) { type Server struct { http.Handler - workers []*worker.Worker - units []server.Unit - cancel func() - done chan struct{} - err error + workers []*worker.Worker + units []server.Unit + cronEntries []*server.CronEntry + cronEngine *cron.Cron + cancel func() + done chan struct{} + err error } func (srv *Server) Units() []server.Unit { return srv.units } +func (srv *Server) Workers() []*worker.Worker { return srv.workers } + func (srv *Server) Close() { for _, wrk := range srv.workers { wrk.Close() } srv.cancel() + <-srv.cronEngine.Stop().Done() <-srv.done } @@ -213,6 +226,7 @@ func (srv *Server) Err() error { } func (srv *Server) run(ctx context.Context) { + srv.cronEngine.Start() err := server.Run(ctx, srv.workers) if err != nil { log.Println("workers stopped:", err) diff --git a/server/server_test.go b/server/server_test.go index cb6e5ae..312e7e3 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -149,3 +149,29 @@ func Test_retryIfDataReturnedInBinMode(t *testing.T) { } } + +func TestCron(t *testing.T) { + srv := testServer(t, runner.DefaultConfig(), map[string]server.Unit{ + "hello": { + Command: "echo hello world", + Cron: []server.CronSpec{ + {Spec: "@every 1s"}, + }, + }, + }) + defer srv.Close() + time.Sleep(time.Second + 100*time.Millisecond) + + var first *meta.Request + err := srv.Workers()[0].Meta().Iterate(func(id string, record meta.Request) error { + first = &record + return nil + }) + + if !assert.NoError(t, err) { + return + } + + assert.True(t, first.Complete) + assert.Len(t, first.Attempts, 1) +} diff --git a/server/unit.go b/server/unit.go index 6a22e20..719d073 100644 --- a/server/unit.go +++ b/server/unit.go @@ -14,6 +14,7 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" "sync" "time" @@ -58,6 +59,7 @@ type Unit struct { Basic `yaml:",inline"` } `yaml:"basic,omitempty"` // basic authorization } `yaml:"authorization,omitempty"` + Cron []CronSpec `yaml:"cron,omitempty"` // cron-tab like definition (see CronSpec) name string } @@ -99,6 +101,12 @@ func (cfg Unit) Validate() error { if !(cfg.Mode == "bin" || cfg.Mode == "cgi" || cfg.Mode == "proxy") { checks = append(checks, "unknown mode "+cfg.Mode) } + for i, spec := range cfg.Cron { + err := spec.Validate() + if err != nil { + checks = append(checks, "cron "+spec.Label(strconv.Itoa(i))+": "+err.Error()) + } + } if len(checks) == 0 { return nil } @@ -115,6 +123,8 @@ func (cfg Unit) SaveFile(file string) error { func (cfg Unit) Name() string { return cfg.name } +func (cfg Unit) Path() string { return "/" + cfg.name + "/" } + func (cfg Unit) Secured() bool { return cfg.Authorization.Basic.Enable || cfg.Authorization.HeaderToken.Enable || @@ -182,7 +192,7 @@ func Handler(units []Unit, workers []*worker.Worker) http.Handler { func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker) { for i, unit := range units { - group := router.Group("/" + unit.name + "/") + group := router.Group(unit.Path()) group.Use(unit.enableAuthorization()) api.Expose(group, workers[i]) }