diff --git a/server/internal/adapter.go b/server/internal/adapter.go index bdc96df..11b4ac5 100644 --- a/server/internal/adapter.go +++ b/server/internal/adapter.go @@ -17,6 +17,7 @@ import ( // Expose worker as HTTP handler: // POST / - post task async, returns 303 See Other and location. // GET /:id - get task info. +// POST /:id - retry task, redirects to /:id // GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete) // GET /:id/attempt/:atid - get attempt result (as-is). // GET /:id/request - replay request (as-is). @@ -32,8 +33,8 @@ func Expose(router *mux.Router, wrk *worker.Worker) { writer.Header().Set("X-Correlation-Id", id) http.Redirect(writer, request, id, http.StatusSeeOther) }) - // get state: 200 with json description. For complete request - Location header will be filled. - router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk)) + router.Path("/{id}").Methods("GET").HandlerFunc(getTask(wrk)) + router.Path("/{id}").Methods("POST").HandlerFunc(retry(wrk)) router.Path("/{id}").Methods("DELETE").HandlerFunc(completeRequest(wrk)) router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk)) // get attempt result as-is. @@ -43,7 +44,7 @@ func Expose(router *mux.Router, wrk *worker.Worker) { } // get request meta information. -func createTask(wrk *worker.Worker) http.HandlerFunc { +func getTask(wrk *worker.Worker) http.HandlerFunc { return func(writer http.ResponseWriter, request *http.Request) { params := mux.Vars(request) requestID := params["id"] @@ -87,6 +88,21 @@ func createTask(wrk *worker.Worker) http.HandlerFunc { } } +func retry(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + id, err := wrk.Retry(request.Context(), requestID) + if err != nil { + log.Println("failed to retry:", err) + http.Error(writer, "failed to enqueue", http.StatusInternalServerError) + return + } + writer.Header().Set("X-Correlation-Id", id) + http.Redirect(writer, request, id, http.StatusSeeOther) + } +} + func completeRequest(wrk *worker.Worker) http.HandlerFunc { return func(writer http.ResponseWriter, request *http.Request) { params := mux.Vars(request) diff --git a/worker/worker.go b/worker/worker.go index 09a6058..7dd814d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -252,23 +252,29 @@ func (mgr *Worker) Run(global context.Context) error { return ctx.Err() } +// Retry processing. +func (mgr *Worker) Retry(ctx context.Context, requestID string) (string, error) { + info, err := mgr.meta.Get(requestID) + if err != nil { + return "", err + } + req, err := mgr.restoreRequest(ctx, requestID, info) + if err != nil { + return "", err + } + defer req.Body.Close() + return mgr.Enqueue(req) +} + func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error { // caller should ensure that request id is valid - f, err := mgr.blob.Get(requestID) + req, err := mgr.restoreRequest(ctx, requestID, info) if err != nil { return err } - defer f.Close() + defer req.Body.Close() attemptID := uuid.New().String() - req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f) - if err != nil { - return err - } - for k, v := range info.Headers { - req.Header[k] = v - } - req.Header.Set("X-Correlation-Id", requestID) req.Header.Set("X-Attempt-Id", attemptID) req.Header.Set("X-Attempt", strconv.Itoa(len(info.Attempts)+1)) @@ -442,6 +448,22 @@ func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID st log.Println("request", id, "processed with attempt", attemptID) } +func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *meta.Request) (*http.Request, error) { + f, err := mgr.blob.Get(requestID) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f) + if err != nil { + _ = f.Close() + return nil, err + } + for k, v := range info.Headers { + req.Header[k] = v + } + return req, nil +} + type requeueItem struct { At time.Time ID string