diff --git a/server/api/adapter.go b/server/api/adapter.go index 342b1b2..8633f56 100644 --- a/server/api/adapter.go +++ b/server/api/adapter.go @@ -14,14 +14,15 @@ import ( ) // Expose worker as HTTP handler: -// POST / - post task async, returns 303 See Other and location. +// POST / - post task async, returns 303 See Other and location +// PUT / - post task synchronously, supports ?wait= parameter for custom wait time // GET /:id - get task info. // POST /:id - retry task, redirects to /:id // GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete) // GET /:id/attempt/:atid - get attempt result (as-is). // GET /:id/request - replay request (as-is). -func Expose(router gin.IRouter, wrk *worker.Worker) { - //TODO: wait +func Expose(router gin.IRouter, wrk *worker.Worker, defaultWaitTime time.Duration) { + handler := &workerHandler{wrk: wrk, defaultWait: defaultWaitTime} router.POST("/", func(gctx *gin.Context) { id, err := wrk.Enqueue(gctx.Request) if err != nil { @@ -32,7 +33,7 @@ func Expose(router gin.IRouter, wrk *worker.Worker) { gctx.Header("X-Correlation-Id", id) gctx.Redirect(http.StatusSeeOther, id) }) - handler := &workerHandler{wrk: wrk} + router.PUT("/", handler.createSyncTask) taskRoutes := router.Group("/:id") taskRoutes.GET("", handler.getTask) taskRoutes.POST("", handler.retry) @@ -45,7 +46,37 @@ func Expose(router gin.IRouter, wrk *worker.Worker) { } type workerHandler struct { - wrk *worker.Worker + wrk *worker.Worker + defaultWait time.Duration +} + +func (wh *workerHandler) createSyncTask(gctx *gin.Context) { + var queryParams struct { + Wait time.Duration `query:"wait" form:"wait"` + } + queryParams.Wait = wh.defaultWait + + if err := gctx.BindQuery(&queryParams); err != nil { + return + } + if queryParams.Wait <= 0 { + gctx.AbortWithStatus(http.StatusBadRequest) + return + } + + tracker, err := wh.wrk.EnqueueWithTracker(gctx.Request) + if err != nil { + log.Println("failed to enqueue:", err) + _ = gctx.AbortWithError(http.StatusInternalServerError, err) + return + } + gctx.Header("X-Correlation-Id", tracker.ID()) + select { + case <-tracker.Done(): + gctx.Redirect(http.StatusSeeOther, tracker.ID()+"/completed") + case <-time.After(queryParams.Wait): + gctx.AbortWithStatus(http.StatusGatewayTimeout) + } } // get request meta information. diff --git a/server/runner/handler.go b/server/runner/handler.go index 5aa0719..9728442 100644 --- a/server/runner/handler.go +++ b/server/runner/handler.go @@ -33,6 +33,7 @@ type Config struct { GracefulShutdown time.Duration `yaml:"graceful_shutdown"` DisableUI bool `yaml:"disable_ui"` Auth ui.Authorization `yaml:"auth,omitempty"` + DefaultWaitTime time.Duration `yaml:"wait_time,omitempty"` TLS struct { Enable bool `yaml:"enable"` Cert string `yaml:"cert"` @@ -41,6 +42,7 @@ type Config struct { } const ( + defaultWaitTime = 30 * time.Second defaultGracefulShutdown = 5 * time.Second defaultBind = "127.0.0.1:8989" ) @@ -52,6 +54,7 @@ func DefaultConfig() Config { cfg.ConfigDirectory = filepath.Join("conf.d") cfg.UIDirectory = filepath.Join("ui") cfg.GracefulShutdown = defaultGracefulShutdown + cfg.DefaultWaitTime = defaultWaitTime return cfg } @@ -89,7 +92,7 @@ func (cfg Config) SaveFile(file string) error { return ioutil.WriteFile(file, data, 0600) } -func (cfg Config) Create(global context.Context) (*Server, error) { +func (cfg Config) Create(global context.Context, defaultWaitTime time.Duration) (*Server, error) { units, err := server.Units(cfg.ConfigDirectory) if err != nil { return nil, err @@ -113,7 +116,7 @@ func (cfg Config) Create(global context.Context) (*Server, error) { gctx.Next() }) cfg.installUI(router, units, workers, cronEntries) - server.Attach(router.Group("/api/"), units, workers) + server.Attach(router.Group("/api/"), units, workers, defaultWaitTime) srv := &Server{ Handler: router, @@ -132,7 +135,7 @@ func (cfg Config) Run(global context.Context) error { ctx, cancel := context.WithCancel(global) defer cancel() - srv, err := cfg.Create(global) + srv, err := cfg.Create(global, cfg.DefaultWaitTime) if err != nil { return err } diff --git a/server/server_test.go b/server/server_test.go index 312e7e3..ad2b02b 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -52,7 +52,7 @@ func testServer(t *testing.T, cfg runner.Config, units map[string]server.Unit) * } } - srv, err := cfg.Create(context.Background()) + srv, err := cfg.Create(context.Background(), cfg.DefaultWaitTime) if !assert.NoError(t, err) { srv.Close() t.Fatal("failed to create server") @@ -175,3 +175,17 @@ func TestCron(t *testing.T) { assert.True(t, first.Complete) assert.Len(t, first.Attempts, 1) } + +func TestSync(t *testing.T) { + srv := testServer(t, runner.DefaultConfig(), map[string]server.Unit{ + "hello": { + Command: "echo hello world", + }, + }) + defer srv.Close() + + req := httptest.NewRequest(http.MethodPut, "/api/hello/", bytes.NewBufferString("hello world")) + res := httptest.NewRecorder() + srv.ServeHTTP(res, req) + assert.Equal(t, http.StatusSeeOther, res.Code) +} diff --git a/server/unit.go b/server/unit.go index 738b3e8..f6eaa09 100644 --- a/server/unit.go +++ b/server/unit.go @@ -185,18 +185,18 @@ func Workers(workdir string, configurations []Unit) ([]*worker.Worker, error) { return ans, nil } -func Handler(units []Unit, workers []*worker.Worker) http.Handler { +func Handler(units []Unit, workers []*worker.Worker, defaultWaitTime time.Duration) http.Handler { router := gin.New() - Attach(router, units, workers) + Attach(router, units, workers, defaultWaitTime) return router } -func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker) { +func Attach(router gin.IRouter, units []Unit, workers []*worker.Worker, defaultWaitTime time.Duration) { for i, unit := range units { if !unit.Private { group := router.Group(unit.Path()) group.Use(unit.enableAuthorization()) - api.Expose(group, workers[i]) + api.Expose(group, workers[i], defaultWaitTime) } else { log.Println("do not expose unit", unit.Name(), "because it's private") } diff --git a/services/meta/interface.go b/services/meta/interface.go index bb4e4b5..8218a3a 100644 --- a/services/meta/interface.go +++ b/services/meta/interface.go @@ -34,3 +34,12 @@ type Request struct { Method string `json:"method"` Complete bool `json:"complete"` } + +func (rq *Request) Success() bool { + for _, item := range rq.Attempts { + if item.Code == 0 { + return true + } + } + return false +} diff --git a/worker/tracker.go b/worker/tracker.go new file mode 100644 index 0000000..2be12d2 --- /dev/null +++ b/worker/tracker.go @@ -0,0 +1,51 @@ +package worker + +import "sync/atomic" + +func newTracker(id string) *Tracker { + return &Tracker{ + id: id, + done: make(chan struct{}), + } +} + +type Tracker struct { + id string + done chan struct{} + success bool + attemptID string + finished int32 +} + +func (t *Tracker) ID() string { + return t.id +} + +func (t *Tracker) Done() <-chan struct{} { + return t.done +} + +func (t *Tracker) Success() bool { + return t.success +} + +func (t *Tracker) Attempt() string { + return t.attemptID +} + +func (t *Tracker) close() { + if atomic.CompareAndSwapInt32(&t.finished, 0, 1) { + close(t.done) + } +} + +func (t *Tracker) ok(attemptID string) { + t.attemptID = attemptID + t.success = true + t.close() +} + +func (t *Tracker) failed() { + t.success = false + t.close() +} diff --git a/worker/worker.go b/worker/worker.go index 1dd1744..4005092 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -120,6 +120,7 @@ type Worker struct { reloadMeta chan struct{} interval time.Duration sequence uint64 + trackers sync.Map // id -> *Tracker } func (mgr *Worker) init() error { @@ -156,6 +157,26 @@ func (mgr *Worker) Enqueue(req *http.Request) (string, error) { return id, err } +// Enqueue request to storage, save meta-info to meta storage and push id into processing queue. Generated ID +// always unique and returns only in case of successful enqueue. Returns Tracker to understand job processing. +// Tracking jobs is not free operation! Do not use it just because you can. +func (mgr *Worker) EnqueueWithTracker(req *http.Request) (*Tracker, error) { + id, err := mgr.saveRequest(req) + if err != nil { + return nil, err + } + track := newTracker(id) + log.Println("new request saved:", id) + mgr.trackers.Store(id, track) + err = mgr.queue.Push([]byte(id)) + if err != nil { + track.close() + mgr.trackers.Delete(id) + return nil, err + } + return track, nil +} + // Complete request manually. func (mgr *Worker) Complete(requestID string) error { err := mgr.meta.Complete(requestID) @@ -448,6 +469,7 @@ func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Reques handler(ctx, id, info) } log.Println("request", id, "completely failed") + mgr.completeTrack(id, "", true) } func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) { @@ -466,6 +488,7 @@ func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID st handler(ctx, id, attemptID, info) } log.Println("request", id, "processed with attempt", attemptID) + mgr.completeTrack(id, attemptID, info.Success()) } func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *meta.Request) (*http.Request, error) { @@ -484,6 +507,19 @@ func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *m return req, nil } +func (mgr *Worker) completeTrack(id string, attemptID string, failed bool) { + track, ok := mgr.trackers.LoadAndDelete(id) + if !ok { + return + } + tracker := track.(*Tracker) + if failed { + tracker.failed() + } else { + tracker.ok(attemptID) + } +} + func encodeID(nsID byte, id uint64) string { var data [9]byte data[0] = nsID