add cron-like unit configuration

This commit is contained in:
Alexander Baryshnikov 2020-11-07 21:38:13 +08:00
parent 8490f012a2
commit 8f7171abc1
8 changed files with 224 additions and 12 deletions

41
_docs/cron.md Normal file
View File

@ -0,0 +1,41 @@
# Cron
Cron-like jobs allowed as part of Unit definition [thanks to robfig/cron](https://godoc.org/github.com/robfig/cron#hdr-Usage).
**Example definition:**
```yaml
# ... unit definition above ...
cron:
# every hour on the half hour
- spec: 30 * * * *
# same as above but with name to detect in UI and logs
- spec: 30 * * * *
name: named schedule
# each hour with custom payload and headers
- spec: @hourly
content: |
hello world
headers:
X-Some-Header: test-header
# each day with content from file
- spec: @daily
content_file: /path/to/content
```
Schema:
* `spec` (required, string) - cron tab specification for the time interval. See [online builder](https://crontab.guru/)
* `name` (optional, string) - name for entry to distinguish record in UI or in logs.
* `headers` (optional, map string=>string) - headers that will be used in simulated request.
* `content` (optional, string) - simulated request content.
* `content_file` (optional, string) - simulated request content file. Has less priority than `content`.
Caveats:
* **Security:** cron job ignores authorizations defined on unit level.
* **Enqueuing:** cron job will be enqueued regardless of status of previous job.
* **Errors:** if cron job can't create request (ex: `content_file` not available) - it will print error to log and
will try again later at the next schedule.

View File

@ -1,3 +1,21 @@
# Unit
* If work dir is not defined - temporary directory will be created and removed after execution for each request automatically.
* If work dir not defined - temporary directory will be created and removed after execution for each request automatically.
Schema:
* `command` (required, string) - command to execute (will be executed in a shell)
* `interval` (optional, interval) - interval between attempts
* `timeout` (optional, interval) - maximum execution timeout (enabled only for bin mode and only if positive)
* `graceful_timeout` (optional, interval) - maximum execution timeout after which SIGINT will be sent (enabled only for bin mode and only if positive).
Ie: how long to let command react on SIGTERM signal.
* `shell` (optional, string) - shell to execute command in bin mode (default - /bin/sh)
* `environment` (optional, map string=>string) - custom environment for executable (in addition to system)
* `max_request` (optional, integer) - maximum HTTP body size (enabled if positive)
* `attempts` (optional, integer) - maximum number of attempts
* `workers` (optional, integer) - concurrency level - number of parallel requests
* `mode` (optional, string) - execution mode: `bin` or `cgi`
* `workdir` (optional, string) - working directory for the worker. if empty - temporary one will be generated automatically.
* `authorization` (optional, [Authorization](authorization.md)) - request authorization
* `cron` (optional,[Cron](cron.md)) - scheduled requests

1
go.mod
View File

@ -14,6 +14,7 @@ require (
github.com/imdario/mergo v0.3.11 // indirect
github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.0
github.com/stretchr/testify v1.4.0
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43

2
go.sum
View File

@ -184,6 +184,8 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=

100
server/cron.go Normal file
View File

@ -0,0 +1,100 @@
package server
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"github.com/robfig/cron/v3"
"nano-run/worker"
)
type CronSpec struct {
Spec string `yaml:"spec"` // cron tab spec with seconds precision
Name string `yaml:"name,omitempty"` // optional name to distinguish in logs and ui
Headers map[string]string `yaml:"headers,omitempty"` // headers in simulated request
Content string `yaml:"content,omitempty"` // content in simulated request
ContentFile string `yaml:"content_file,omitempty"` // content file to read for request content
}
func (cs CronSpec) Validate() error {
_, err := cron.ParseStandard(cs.Spec)
return err
}
func (cs *CronSpec) Label(def string) string {
if cs.Name != "" {
return cs.Name
}
return def
}
func (cs *CronSpec) Request(requestPath string) (*http.Request, error) {
var src io.ReadCloser
if cs.Content != "" || cs.ContentFile == "" {
src = ioutil.NopCloser(bytes.NewReader([]byte(cs.Content)))
} else if f, err := os.Open(cs.ContentFile); err != nil {
return nil, err
} else {
src = f
}
req, err := http.NewRequest(http.MethodPost, requestPath, src)
if err != nil {
_ = src.Close()
}
return req, err
}
type CronEntry struct {
Name string
Spec CronSpec
Worker *worker.Worker
Config Unit
ID cron.EntryID
}
// Cron initializes cron engine and registers all required worker schedules to it.
func Cron(workers []*worker.Worker, configs []Unit) ([]*CronEntry, *cron.Cron, error) {
engine := cron.New()
var entries []*CronEntry
for i, wrk := range workers {
cfg := configs[i]
for i, cronSpec := range cfg.Cron {
name := cfg.Name() + "/" + cronSpec.Label(strconv.Itoa(i))
entry := &CronEntry{
Spec: cronSpec,
Worker: wrk,
Config: cfg,
Name: name,
}
id, err := engine.AddJob(cronSpec.Spec, entry)
if err != nil {
return nil, nil, fmt.Errorf("cron record %s: %w", name, err)
}
entry.ID = id
entries = append(entries, entry)
}
}
return entries, engine, nil
}
func (ce *CronEntry) Run() {
req, err := ce.Spec.Request(ce.Config.Path())
if err != nil {
log.Println("failed create cron", ce.Name, "request:", err)
return
}
id, err := ce.Worker.Enqueue(req)
if err != nil {
log.Println("failed enqueue cron", ce.Name, "job:", err)
return
}
log.Println("enqueued cron", ce.Name, "job with id", id)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/Masterminds/sprig"
"github.com/gin-gonic/gin"
"github.com/robfig/cron/v3"
"gopkg.in/yaml.v2"
"nano-run/server"
@ -97,6 +98,11 @@ func (cfg Config) Create(global context.Context) (*Server, error) {
if err != nil {
return nil, err
}
cronEntries, cronEngine, err := server.Cron(workers, units)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(global)
router := gin.Default()
router.Use(func(gctx *gin.Context) {
@ -107,11 +113,13 @@ func (cfg Config) Create(global context.Context) (*Server, error) {
server.Attach(router.Group("/api/"), units, workers)
srv := &Server{
Handler: router,
workers: workers,
units: units,
done: make(chan struct{}),
cancel: cancel,
Handler: router,
workers: workers,
cronEngine: cronEngine,
cronEntries: cronEntries,
units: units,
done: make(chan struct{}),
cancel: cancel,
}
go srv.run(ctx)
return srv, nil
@ -191,20 +199,25 @@ func (cfg Config) useEmbeddedUI(router *gin.Engine, uiGroup gin.IRouter) {
type Server struct {
http.Handler
workers []*worker.Worker
units []server.Unit
cancel func()
done chan struct{}
err error
workers []*worker.Worker
units []server.Unit
cronEntries []*server.CronEntry
cronEngine *cron.Cron
cancel func()
done chan struct{}
err error
}
func (srv *Server) Units() []server.Unit { return srv.units }
func (srv *Server) Workers() []*worker.Worker { return srv.workers }
func (srv *Server) Close() {
for _, wrk := range srv.workers {
wrk.Close()
}
srv.cancel()
<-srv.cronEngine.Stop().Done()
<-srv.done
}
@ -213,6 +226,7 @@ func (srv *Server) Err() error {
}
func (srv *Server) run(ctx context.Context) {
srv.cronEngine.Start()
err := server.Run(ctx, srv.workers)
if err != nil {
log.Println("workers stopped:", err)

View File

@ -149,3 +149,29 @@ func Test_retryIfDataReturnedInBinMode(t *testing.T) {
}
}
func TestCron(t *testing.T) {
srv := testServer(t, runner.DefaultConfig(), map[string]server.Unit{
"hello": {
Command: "echo hello world",
Cron: []server.CronSpec{
{Spec: "@every 1s"},
},
},
})
defer srv.Close()
time.Sleep(time.Second + 100*time.Millisecond)
var first *meta.Request
err := srv.Workers()[0].Meta().Iterate(func(id string, record meta.Request) error {
first = &record
return nil
})
if !assert.NoError(t, err) {
return
}
assert.True(t, first.Complete)
assert.Len(t, first.Attempts, 1)
}

View File

@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -58,6 +59,7 @@ type Unit struct {
Basic `yaml:",inline"`
} `yaml:"basic,omitempty"` // basic authorization
} `yaml:"authorization,omitempty"`
Cron []CronSpec `yaml:"cron,omitempty"` // cron-tab like definition (see CronSpec)
name string
}
@ -99,6 +101,12 @@ func (cfg Unit) Validate() error {
if !(cfg.Mode == "bin" || cfg.Mode == "cgi" || cfg.Mode == "proxy") {
checks = append(checks, "unknown mode "+cfg.Mode)
}
for i, spec := range cfg.Cron {
err := spec.Validate()
if err != nil {
checks = append(checks, "cron "+spec.Label(strconv.Itoa(i))+": "+err.Error())
}
}
if len(checks) == 0 {
return nil
}
@ -115,6 +123,8 @@ func (cfg Unit) SaveFile(file string) error {
func (cfg Unit) Name() string { return cfg.name }
func (cfg Unit) Path() string { return "/" + cfg.name + "/" }
func (cfg Unit) Secured() bool {
return cfg.Authorization.Basic.Enable ||
cfg.Authorization.HeaderToken.Enable ||
@ -182,7 +192,7 @@ func Handler(units []Unit, workers []*worker.Worker) http.Handler {
func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker) {
for i, unit := range units {
group := router.Group("/" + unit.name + "/")
group := router.Group(unit.Path())
group.Use(unit.enableAuthorization())
api.Expose(group, workers[i])
}