add retry

This commit is contained in:
Alexander Baryshnikov 2020-09-17 19:10:28 +08:00
parent e097bb0dbd
commit f714097f1b
2 changed files with 51 additions and 13 deletions

View File

@ -17,6 +17,7 @@ import (
// Expose worker as HTTP handler:
// POST / - post task async, returns 303 See Other and location.
// GET /:id - get task info.
// POST /:id - retry task, redirects to /:id
// GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete)
// GET /:id/attempt/:atid - get attempt result (as-is).
// GET /:id/request - replay request (as-is).
@ -32,8 +33,8 @@ func Expose(router *mux.Router, wrk *worker.Worker) {
writer.Header().Set("X-Correlation-Id", id)
http.Redirect(writer, request, id, http.StatusSeeOther)
})
// get state: 200 with json description. For complete request - Location header will be filled.
router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk))
router.Path("/{id}").Methods("GET").HandlerFunc(getTask(wrk))
router.Path("/{id}").Methods("POST").HandlerFunc(retry(wrk))
router.Path("/{id}").Methods("DELETE").HandlerFunc(completeRequest(wrk))
router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk))
// get attempt result as-is.
@ -43,7 +44,7 @@ func Expose(router *mux.Router, wrk *worker.Worker) {
}
// get request meta information.
func createTask(wrk *worker.Worker) http.HandlerFunc {
func getTask(wrk *worker.Worker) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)
requestID := params["id"]
@ -87,6 +88,21 @@ func createTask(wrk *worker.Worker) http.HandlerFunc {
}
}
func retry(wrk *worker.Worker) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)
requestID := params["id"]
id, err := wrk.Retry(request.Context(), requestID)
if err != nil {
log.Println("failed to retry:", err)
http.Error(writer, "failed to enqueue", http.StatusInternalServerError)
return
}
writer.Header().Set("X-Correlation-Id", id)
http.Redirect(writer, request, id, http.StatusSeeOther)
}
}
func completeRequest(wrk *worker.Worker) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)

View File

@ -252,23 +252,29 @@ func (mgr *Worker) Run(global context.Context) error {
return ctx.Err()
}
// Retry processing.
func (mgr *Worker) Retry(ctx context.Context, requestID string) (string, error) {
info, err := mgr.meta.Get(requestID)
if err != nil {
return "", err
}
req, err := mgr.restoreRequest(ctx, requestID, info)
if err != nil {
return "", err
}
defer req.Body.Close()
return mgr.Enqueue(req)
}
func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error {
// caller should ensure that request id is valid
f, err := mgr.blob.Get(requestID)
req, err := mgr.restoreRequest(ctx, requestID, info)
if err != nil {
return err
}
defer f.Close()
defer req.Body.Close()
attemptID := uuid.New().String()
req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f)
if err != nil {
return err
}
for k, v := range info.Headers {
req.Header[k] = v
}
req.Header.Set("X-Correlation-Id", requestID)
req.Header.Set("X-Attempt-Id", attemptID)
req.Header.Set("X-Attempt", strconv.Itoa(len(info.Attempts)+1))
@ -442,6 +448,22 @@ func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID st
log.Println("request", id, "processed with attempt", attemptID)
}
func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *meta.Request) (*http.Request, error) {
f, err := mgr.blob.Get(requestID)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f)
if err != nil {
_ = f.Close()
return nil, err
}
for k, v := range info.Headers {
req.Header[k] = v
}
return req, nil
}
type requeueItem struct {
At time.Time
ID string