pass X-Correlation-Id, X-Attempt-Id, X-Attempt to worker handler

This commit is contained in:
Alexander Baryshnikov 2020-09-11 19:28:56 +08:00
parent 7a204bb0f3
commit 31f10a9412
2 changed files with 11 additions and 2 deletions

View File

@ -12,3 +12,8 @@ During restart - all incomplete tasks will queued again.
![image](https://user-images.githubusercontent.com/6597086/92578247-3085bb00-f2be-11ea-87de-e2c9d94a21fa.png)
Worker will call upstream with additional headers:
- `X-Correlation-Id` - request id
- `X-Attempt-Id` - attempt id
- `X-Attempt` - attempt num

View File

@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
@ -243,6 +244,7 @@ func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Reques
return err
}
defer f.Close()
attemptID := uuid.New().String()
req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f)
if err != nil {
@ -251,8 +253,10 @@ func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Reques
for k, v := range info.Headers {
req.Header[k] = v
}
req.Header.Set("X-Correlation-Id", requestID)
attemptID := uuid.New().String()
req.Header.Set("X-Attempt-Id", attemptID)
req.Header.Set("X-Attempt", strconv.Itoa(len(info.Attempts)+1))
var header meta.AttemptHeader
@ -377,7 +381,7 @@ func (mgr *Worker) saveRequest(req *http.Request) (string, error) {
if err != nil {
return "", err
}
return id, mgr.meta.CreateRequest(id, req.Header, req.RequestURI, req.Method)
return id, mgr.meta.CreateRequest(id, req.Header, req.URL.RequestURI(), req.Method)
}
func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Request) {