add pseudo-sync mode

This commit is contained in:
Alexander Baryshnikov 2020-11-10 01:57:57 +08:00
parent 5fc0adf8e6
commit bae41da6cb
7 changed files with 157 additions and 13 deletions

View File

@ -14,14 +14,15 @@ import (
)
// Expose worker as HTTP handler:
// POST / - post task async, returns 303 See Other and location.
// POST / - post task async, returns 303 See Other and location
// PUT / - post task synchronously, supports ?wait=<duration> parameter for custom wait time
// GET /:id - get task info.
// POST /:id - retry task, redirects to /:id
// GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete)
// GET /:id/attempt/:atid - get attempt result (as-is).
// GET /:id/request - replay request (as-is).
func Expose(router gin.IRouter, wrk *worker.Worker) {
//TODO: wait
func Expose(router gin.IRouter, wrk *worker.Worker, defaultWaitTime time.Duration) {
handler := &workerHandler{wrk: wrk, defaultWait: defaultWaitTime}
router.POST("/", func(gctx *gin.Context) {
id, err := wrk.Enqueue(gctx.Request)
if err != nil {
@ -32,7 +33,7 @@ func Expose(router gin.IRouter, wrk *worker.Worker) {
gctx.Header("X-Correlation-Id", id)
gctx.Redirect(http.StatusSeeOther, id)
})
handler := &workerHandler{wrk: wrk}
router.PUT("/", handler.createSyncTask)
taskRoutes := router.Group("/:id")
taskRoutes.GET("", handler.getTask)
taskRoutes.POST("", handler.retry)
@ -45,7 +46,37 @@ func Expose(router gin.IRouter, wrk *worker.Worker) {
}
type workerHandler struct {
wrk *worker.Worker
wrk *worker.Worker
defaultWait time.Duration
}
func (wh *workerHandler) createSyncTask(gctx *gin.Context) {
var queryParams struct {
Wait time.Duration `query:"wait" form:"wait"`
}
queryParams.Wait = wh.defaultWait
if err := gctx.BindQuery(&queryParams); err != nil {
return
}
if queryParams.Wait <= 0 {
gctx.AbortWithStatus(http.StatusBadRequest)
return
}
tracker, err := wh.wrk.EnqueueWithTracker(gctx.Request)
if err != nil {
log.Println("failed to enqueue:", err)
_ = gctx.AbortWithError(http.StatusInternalServerError, err)
return
}
gctx.Header("X-Correlation-Id", tracker.ID())
select {
case <-tracker.Done():
gctx.Redirect(http.StatusSeeOther, tracker.ID()+"/completed")
case <-time.After(queryParams.Wait):
gctx.AbortWithStatus(http.StatusGatewayTimeout)
}
}
// get request meta information.

View File

@ -33,6 +33,7 @@ type Config struct {
GracefulShutdown time.Duration `yaml:"graceful_shutdown"`
DisableUI bool `yaml:"disable_ui"`
Auth ui.Authorization `yaml:"auth,omitempty"`
DefaultWaitTime time.Duration `yaml:"wait_time,omitempty"`
TLS struct {
Enable bool `yaml:"enable"`
Cert string `yaml:"cert"`
@ -41,6 +42,7 @@ type Config struct {
}
const (
defaultWaitTime = 30 * time.Second
defaultGracefulShutdown = 5 * time.Second
defaultBind = "127.0.0.1:8989"
)
@ -52,6 +54,7 @@ func DefaultConfig() Config {
cfg.ConfigDirectory = filepath.Join("conf.d")
cfg.UIDirectory = filepath.Join("ui")
cfg.GracefulShutdown = defaultGracefulShutdown
cfg.DefaultWaitTime = defaultWaitTime
return cfg
}
@ -89,7 +92,7 @@ func (cfg Config) SaveFile(file string) error {
return ioutil.WriteFile(file, data, 0600)
}
func (cfg Config) Create(global context.Context) (*Server, error) {
func (cfg Config) Create(global context.Context, defaultWaitTime time.Duration) (*Server, error) {
units, err := server.Units(cfg.ConfigDirectory)
if err != nil {
return nil, err
@ -113,7 +116,7 @@ func (cfg Config) Create(global context.Context) (*Server, error) {
gctx.Next()
})
cfg.installUI(router, units, workers, cronEntries)
server.Attach(router.Group("/api/"), units, workers)
server.Attach(router.Group("/api/"), units, workers, defaultWaitTime)
srv := &Server{
Handler: router,
@ -132,7 +135,7 @@ func (cfg Config) Run(global context.Context) error {
ctx, cancel := context.WithCancel(global)
defer cancel()
srv, err := cfg.Create(global)
srv, err := cfg.Create(global, cfg.DefaultWaitTime)
if err != nil {
return err
}

View File

@ -52,7 +52,7 @@ func testServer(t *testing.T, cfg runner.Config, units map[string]server.Unit) *
}
}
srv, err := cfg.Create(context.Background())
srv, err := cfg.Create(context.Background(), cfg.DefaultWaitTime)
if !assert.NoError(t, err) {
srv.Close()
t.Fatal("failed to create server")
@ -175,3 +175,17 @@ func TestCron(t *testing.T) {
assert.True(t, first.Complete)
assert.Len(t, first.Attempts, 1)
}
func TestSync(t *testing.T) {
srv := testServer(t, runner.DefaultConfig(), map[string]server.Unit{
"hello": {
Command: "echo hello world",
},
})
defer srv.Close()
req := httptest.NewRequest(http.MethodPut, "/api/hello/", bytes.NewBufferString("hello world"))
res := httptest.NewRecorder()
srv.ServeHTTP(res, req)
assert.Equal(t, http.StatusSeeOther, res.Code)
}

View File

@ -185,18 +185,18 @@ func Workers(workdir string, configurations []Unit) ([]*worker.Worker, error) {
return ans, nil
}
func Handler(units []Unit, workers []*worker.Worker) http.Handler {
func Handler(units []Unit, workers []*worker.Worker, defaultWaitTime time.Duration) http.Handler {
router := gin.New()
Attach(router, units, workers)
Attach(router, units, workers, defaultWaitTime)
return router
}
func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker) {
func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker, defaultWaitTime time.Duration) {
for i, unit := range units {
if !unit.Private {
group := router.Group(unit.Path())
group.Use(unit.enableAuthorization())
api.Expose(group, workers[i])
api.Expose(group, workers[i], defaultWaitTime)
} else {
log.Println("do not expose unit", unit.Name(), "because it's private")
}

View File

@ -34,3 +34,12 @@ type Request struct {
Method string `json:"method"`
Complete bool `json:"complete"`
}
func (rq *Request) Success() bool {
for _, item := range rq.Attempts {
if item.Code == 0 {
return true
}
}
return false
}

51
worker/tracker.go Normal file
View File

@ -0,0 +1,51 @@
package worker
import "sync/atomic"
func newTracker(id string) *Tracker {
return &Tracker{
id: id,
done: make(chan struct{}),
}
}
type Tracker struct {
id string
done chan struct{}
success bool
attemptID string
finished int32
}
func (t *Tracker) ID() string {
return t.id
}
func (t *Tracker) Done() <-chan struct{} {
return t.done
}
func (t *Tracker) Success() bool {
return t.success
}
func (t *Tracker) Attempt() string {
return t.attemptID
}
func (t *Tracker) close() {
if atomic.CompareAndSwapInt32(&t.finished, 0, 1) {
close(t.done)
}
}
func (t *Tracker) ok(attemptID string) {
t.attemptID = attemptID
t.success = true
t.close()
}
func (t *Tracker) failed() {
t.success = false
t.close()
}

View File

@ -120,6 +120,7 @@ type Worker struct {
reloadMeta chan struct{}
interval time.Duration
sequence uint64
trackers sync.Map // id -> *Tracker
}
func (mgr *Worker) init() error {
@ -156,6 +157,26 @@ func (mgr *Worker) Enqueue(req *http.Request) (string, error) {
return id, err
}
// Enqueue request to storage, save meta-info to meta storage and push id into processing queue. Generated ID
// always unique and returns only in case of successful enqueue. Returns Tracker to understand job processing.
// Tracking jobs is not free operation! Do not use it just because you can.
func (mgr *Worker) EnqueueWithTracker(req *http.Request) (*Tracker, error) {
id, err := mgr.saveRequest(req)
if err != nil {
return nil, err
}
track := newTracker(id)
log.Println("new request saved:", id)
mgr.trackers.Store(id, track)
err = mgr.queue.Push([]byte(id))
if err != nil {
track.close()
mgr.trackers.Delete(id)
return nil, err
}
return track, nil
}
// Complete request manually.
func (mgr *Worker) Complete(requestID string) error {
err := mgr.meta.Complete(requestID)
@ -448,6 +469,7 @@ func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Reques
handler(ctx, id, info)
}
log.Println("request", id, "completely failed")
mgr.completeTrack(id, "", true)
}
func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) {
@ -466,6 +488,7 @@ func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID st
handler(ctx, id, attemptID, info)
}
log.Println("request", id, "processed with attempt", attemptID)
mgr.completeTrack(id, attemptID, info.Success())
}
func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *meta.Request) (*http.Request, error) {
@ -484,6 +507,19 @@ func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *m
return req, nil
}
func (mgr *Worker) completeTrack(id string, attemptID string, failed bool) {
track, ok := mgr.trackers.LoadAndDelete(id)
if !ok {
return
}
tracker := track.(*Tracker)
if failed {
tracker.failed()
} else {
tracker.ok(attemptID)
}
}
func encodeID(nsID byte, id uint64) string {
var data [9]byte
data[0] = nsID