416 lines
8.7 KiB
Go
416 lines
8.7 KiB
Go
|
package worker
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
"runtime"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/google/uuid"
|
||
|
|
||
|
"nano-run/services/blob"
|
||
|
"nano-run/services/blob/fsblob"
|
||
|
"nano-run/services/meta"
|
||
|
"nano-run/services/meta/micrometa"
|
||
|
"nano-run/services/queue"
|
||
|
"nano-run/services/queue/microqueue"
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
CompleteHandler func(ctx context.Context, requestID string, info *meta.Request)
|
||
|
ProcessHandler func(ctx context.Context, requestID, attemptID string, info *meta.Request)
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
defaultAttempts = 3
|
||
|
defaultInterval = 3 * time.Second
|
||
|
minimalFailedCode = 500
|
||
|
)
|
||
|
|
||
|
func Default(location string) (*Worker, error) {
|
||
|
path := filepath.Join(location, "blobs")
|
||
|
err := os.MkdirAll(path, 0755)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
storage := fsblob.NewCheck(path, func(id string) bool {
|
||
|
_, err := uuid.Parse(id)
|
||
|
return err == nil
|
||
|
})
|
||
|
|
||
|
taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue"))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
requeue, err := microqueue.NewMicroQueue(filepath.Join(location, "requeue"))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
metaStorage, err := micrometa.NewMetaStorage(filepath.Join(location, "meta"))
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
cleanup := func() {
|
||
|
_ = requeue.Close()
|
||
|
_ = taskQueue.Close()
|
||
|
_ = metaStorage.Close()
|
||
|
}
|
||
|
|
||
|
wrk, err := New(taskQueue, requeue, storage, metaStorage)
|
||
|
if err != nil {
|
||
|
cleanup()
|
||
|
return nil, err
|
||
|
}
|
||
|
wrk.cleanup = cleanup
|
||
|
return wrk, nil
|
||
|
}
|
||
|
|
||
|
func New(tasks, requeue queue.Queue, blobs blob.Blob, meta meta.Meta) (*Worker, error) {
|
||
|
wrk := &Worker{
|
||
|
queue: tasks,
|
||
|
requeue: requeue,
|
||
|
blob: blobs,
|
||
|
meta: meta,
|
||
|
maxAttempts: defaultAttempts,
|
||
|
interval: defaultInterval,
|
||
|
concurrency: runtime.NumCPU(),
|
||
|
handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||
|
writer.WriteHeader(http.StatusNoContent)
|
||
|
}),
|
||
|
}
|
||
|
err := wrk.init()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return wrk, nil
|
||
|
}
|
||
|
|
||
|
type Worker struct {
|
||
|
queue queue.Queue
|
||
|
requeue queue.Queue
|
||
|
blob blob.Blob
|
||
|
meta meta.Meta
|
||
|
handler http.Handler
|
||
|
cleanup func()
|
||
|
|
||
|
onDead CompleteHandler
|
||
|
onSuccess CompleteHandler
|
||
|
onProcess ProcessHandler
|
||
|
maxAttempts int
|
||
|
concurrency int
|
||
|
interval time.Duration
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) init() error {
|
||
|
return mgr.meta.Iterate(func(id string, record meta.Request) error {
|
||
|
if !record.Complete {
|
||
|
log.Println("found incomplete job", id)
|
||
|
return mgr.queue.Push([]byte(id))
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) Close() {
|
||
|
if fn := mgr.cleanup; fn != nil {
|
||
|
fn()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) Enqueue(req *http.Request) (string, error) {
|
||
|
id, err := mgr.saveRequest(req)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
log.Println("new request saved:", id)
|
||
|
err = mgr.queue.Push([]byte(id))
|
||
|
return id, err
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker {
|
||
|
mgr.onSuccess = handler
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) OnDead(handler CompleteHandler) *Worker {
|
||
|
mgr.onDead = handler
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) OnProcess(handler ProcessHandler) *Worker {
|
||
|
mgr.onProcess = handler
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) Handler(handler http.Handler) *Worker {
|
||
|
mgr.handler = handler
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) HandlerFunc(fn http.HandlerFunc) *Worker {
|
||
|
mgr.handler = fn
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
// Attempts number of 500x requests.
|
||
|
func (mgr *Worker) Attempts(max int) *Worker {
|
||
|
mgr.maxAttempts = max
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
// Interval between attempts.
|
||
|
func (mgr *Worker) Interval(duration time.Duration) *Worker {
|
||
|
mgr.interval = duration
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
// Concurrency limit (number of parallel tasks). Does not affect already running worker.
|
||
|
// 0 means num CPU.
|
||
|
func (mgr *Worker) Concurrency(num int) *Worker {
|
||
|
mgr.concurrency = num
|
||
|
if num == 0 {
|
||
|
mgr.concurrency = runtime.NumCPU()
|
||
|
}
|
||
|
return mgr
|
||
|
}
|
||
|
|
||
|
// Meta information about requests.
|
||
|
func (mgr *Worker) Meta() meta.Meta {
|
||
|
return mgr.meta
|
||
|
}
|
||
|
|
||
|
// Blobs storage (for large objects).
|
||
|
func (mgr *Worker) Blobs() blob.Blob {
|
||
|
return mgr.blob
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) Run(global context.Context) error {
|
||
|
if mgr.interval < 0 {
|
||
|
return fmt.Errorf("negative interval")
|
||
|
}
|
||
|
if mgr.maxAttempts < 0 {
|
||
|
return fmt.Errorf("negative attempts")
|
||
|
}
|
||
|
if mgr.handler == nil {
|
||
|
return fmt.Errorf("nil handler")
|
||
|
}
|
||
|
if mgr.concurrency <= 0 {
|
||
|
return fmt.Errorf("invalid concurrency number")
|
||
|
}
|
||
|
ctx, cancel := context.WithCancel(global)
|
||
|
defer cancel()
|
||
|
var wg sync.WaitGroup
|
||
|
for i := 0; i < mgr.concurrency; i++ {
|
||
|
wg.Add(1)
|
||
|
go func(i int) {
|
||
|
defer wg.Done()
|
||
|
defer cancel()
|
||
|
err := mgr.runQueue(ctx)
|
||
|
if err != nil {
|
||
|
log.Println("worker", i, "stopped due to error:", err)
|
||
|
} else {
|
||
|
log.Println("worker", i, "stopped")
|
||
|
}
|
||
|
}(i)
|
||
|
}
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
defer cancel()
|
||
|
err := mgr.runReQueue(ctx)
|
||
|
if err != nil {
|
||
|
log.Println("re-queue process stopped due to error:", err)
|
||
|
} else {
|
||
|
log.Println("re-queue process stopped")
|
||
|
}
|
||
|
}()
|
||
|
wg.Wait()
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error {
|
||
|
// caller should ensure that request id is valid
|
||
|
f, err := mgr.blob.Get(requestID)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer f.Close()
|
||
|
|
||
|
req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
for k, v := range info.Headers {
|
||
|
req.Header[k] = v
|
||
|
}
|
||
|
|
||
|
attemptID := uuid.New().String()
|
||
|
|
||
|
var header meta.AttemptHeader
|
||
|
|
||
|
err = mgr.blob.Push(attemptID, func(out io.Writer) error {
|
||
|
res := openResponse(out)
|
||
|
mgr.handler.ServeHTTP(res, req)
|
||
|
header = res.meta
|
||
|
return nil
|
||
|
})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
info, err = mgr.meta.AddAttempt(requestID, attemptID, header)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
mgr.requestProcessed(ctx, requestID, attemptID, info)
|
||
|
if header.Code >= minimalFailedCode {
|
||
|
return fmt.Errorf("500 code returned: %d", header.Code)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) runQueue(ctx context.Context) error {
|
||
|
for {
|
||
|
err := mgr.processQueueItem(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) runReQueue(ctx context.Context) error {
|
||
|
for {
|
||
|
err := mgr.processReQueueItem(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) processQueueItem(ctx context.Context) error {
|
||
|
bid, err := mgr.queue.Get(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
id := string(bid)
|
||
|
log.Println("processing request", id)
|
||
|
info, err := mgr.meta.Get(id)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("get request %s meta info: %w", id, err)
|
||
|
}
|
||
|
if info.Complete {
|
||
|
return fmt.Errorf("request %s already complete", id)
|
||
|
}
|
||
|
err = mgr.call(ctx, id, info)
|
||
|
if err == nil {
|
||
|
mgr.requestSuccess(ctx, id, info)
|
||
|
return nil
|
||
|
}
|
||
|
return mgr.requeueItem(ctx, id, info)
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) processReQueueItem(ctx context.Context) error {
|
||
|
var item requeueItem
|
||
|
|
||
|
data, err := mgr.requeue.Get(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
err = json.Unmarshal(data, &item)
|
||
|
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
d := time.Since(item.At)
|
||
|
if d < mgr.interval {
|
||
|
select {
|
||
|
case <-time.After(mgr.interval - d):
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
return mgr.queue.Push([]byte(item.ID))
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Request) error {
|
||
|
if len(info.Attempts) >= mgr.maxAttempts {
|
||
|
mgr.requestDead(ctx, id, info)
|
||
|
log.Println("maximum attempts reached for request", id)
|
||
|
return nil
|
||
|
}
|
||
|
data, err := json.Marshal(requeueItem{
|
||
|
At: time.Now(),
|
||
|
ID: id,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return mgr.requeue.Push(data)
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) saveRequest(req *http.Request) (string, error) {
|
||
|
id := uuid.New().String()
|
||
|
err := mgr.blob.Push(id, func(out io.Writer) error {
|
||
|
_, err := io.Copy(out, req.Body)
|
||
|
return err
|
||
|
})
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
return id, mgr.meta.CreateRequest(id, req.Header, req.RequestURI, req.Method)
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Request) {
|
||
|
err := mgr.meta.Complete(id)
|
||
|
if err != nil {
|
||
|
log.Println("failed complete (dead) request:", err)
|
||
|
}
|
||
|
if handler := mgr.onDead; handler != nil {
|
||
|
handler(ctx, id, info)
|
||
|
}
|
||
|
log.Println("request", id, "completely failed")
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) {
|
||
|
err := mgr.meta.Complete(id)
|
||
|
if err != nil {
|
||
|
log.Println("failed complete (success) request:", err)
|
||
|
}
|
||
|
if handler := mgr.onSuccess; handler != nil {
|
||
|
handler(ctx, id, info)
|
||
|
}
|
||
|
log.Println("request", id, "complete successfully")
|
||
|
}
|
||
|
|
||
|
func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID string, info *meta.Request) {
|
||
|
if handler := mgr.onProcess; handler != nil {
|
||
|
handler(ctx, id, attemptID, info)
|
||
|
}
|
||
|
log.Println("request", id, "processed with attempt", attemptID)
|
||
|
}
|
||
|
|
||
|
type requeueItem struct {
|
||
|
At time.Time
|
||
|
ID string
|
||
|
}
|