allow forcefully complete request

This commit is contained in:
Alexander Baryshnikov 2020-09-11 20:34:31 +08:00
parent 31f76cd5b3
commit 60e9ad707d
3 changed files with 66 additions and 5 deletions

View File

@ -81,6 +81,15 @@ Body example:
Will redirect to the complete attempt or 404 Will redirect to the complete attempt or 404
## Force complete
|||
|------------|---------------------------|
| **Method** | DELET |
| **Path** | /{correlationId} |
Forcefully mark request as complete and stop processing (including re-queue)
## Get attempt ## Get attempt

View File

@ -34,6 +34,7 @@ func Expose(router *mux.Router, wrk *worker.Worker) {
}) })
// get state: 200 with json description. For complete request - Location header will be filled. // get state: 200 with json description. For complete request - Location header will be filled.
router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk)) router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk))
router.Path("/{id}").Methods("DELETE").HandlerFunc(completeRequest(wrk))
router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk)) router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk))
// get attempt result as-is. // get attempt result as-is.
router.Path("/{id}/attempt/{attemptId}").Methods("GET").HandlerFunc(getAttempt(wrk)) router.Path("/{id}/attempt/{attemptId}").Methods("GET").HandlerFunc(getAttempt(wrk))
@ -86,6 +87,28 @@ func createTask(wrk *worker.Worker) http.HandlerFunc {
} }
} }
func completeRequest(wrk *worker.Worker) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)
requestID := params["id"]
info, err := wrk.Meta().Get(requestID)
if err != nil {
log.Println("failed access request", requestID, ":", err)
http.NotFound(writer, request)
return
}
if !info.Complete {
err = wrk.Meta().Complete(requestID)
if err != nil {
log.Println("failed to mark request as complete:", err)
http.Error(writer, "failed to complete", http.StatusInternalServerError)
return
}
}
writer.WriteHeader(http.StatusNoContent)
}
}
func getComplete(wrk *worker.Worker) http.HandlerFunc { func getComplete(wrk *worker.Worker) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) { return func(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request) params := mux.Vars(request)

View File

@ -80,6 +80,7 @@ func New(tasks, requeue queue.Queue, blobs blob.Blob, meta meta.Meta) (*Worker,
requeue: requeue, requeue: requeue,
blob: blobs, blob: blobs,
meta: meta, meta: meta,
reloadMeta: make(chan struct{}, 1),
maxAttempts: defaultAttempts, maxAttempts: defaultAttempts,
interval: defaultInterval, interval: defaultInterval,
concurrency: runtime.NumCPU(), concurrency: runtime.NumCPU(),
@ -107,6 +108,7 @@ type Worker struct {
onProcess ProcessHandler onProcess ProcessHandler
maxAttempts int maxAttempts int
concurrency int concurrency int
reloadMeta chan struct{}
interval time.Duration interval time.Duration
} }
@ -136,6 +138,19 @@ func (mgr *Worker) Enqueue(req *http.Request) (string, error) {
return id, err return id, err
} }
func (mgr *Worker) Complete(requestID string) error {
err := mgr.meta.Complete(requestID)
if err != nil {
return err
}
select {
case mgr.reloadMeta <- struct{}{}:
default:
}
return nil
}
func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker { func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker {
mgr.onSuccess = handler mgr.onSuccess = handler
return mgr return mgr
@ -322,7 +337,8 @@ func (mgr *Worker) processQueueItem(ctx context.Context) error {
return fmt.Errorf("get request %s meta info: %w", id, err) return fmt.Errorf("get request %s meta info: %w", id, err)
} }
if info.Complete { if info.Complete {
return fmt.Errorf("request %s already complete", id) log.Printf("request %s already complete", id)
return nil
} }
err = mgr.call(ctx, id, info) err = mgr.call(ctx, id, info)
if err == nil { if err == nil {
@ -347,10 +363,23 @@ func (mgr *Worker) processReQueueItem(ctx context.Context) error {
d := time.Since(item.At) d := time.Since(item.At)
if d < mgr.interval { if d < mgr.interval {
select { var ok = false
case <-time.After(mgr.interval - d): for !ok {
case <-ctx.Done(): info, err := mgr.meta.Get(item.ID)
return ctx.Err() if err != nil {
return fmt.Errorf("re-queue: get meta %s: %w", item.ID, err)
}
if info.Complete {
log.Printf("re-queue: %s already complete", item.ID)
return nil
}
select {
case <-time.After(mgr.interval - d):
ok = true
case <-mgr.reloadMeta:
case <-ctx.Done():
return ctx.Err()
}
} }
} }
return mgr.queue.Push([]byte(item.ID)) return mgr.queue.Push([]byte(item.ID))