nano-run/worker/worker.go
Alexander Baryshnikov bae41da6cb add pseudo-sync mode
2020-11-10 01:57:57 +08:00

547 lines
12 KiB
Go

package worker
import (
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"nano-run/services/blob"
"nano-run/services/blob/fsblob"
"nano-run/services/meta"
"nano-run/services/meta/micrometa"
"nano-run/services/queue"
"nano-run/services/queue/microqueue"
)
type (
CompleteHandler func(ctx context.Context, requestID string, info *meta.Request)
ProcessHandler func(ctx context.Context, requestID, attemptID string, info *meta.Request)
)
const (
defaultAttempts = 3
defaultInterval = 3 * time.Second
minimalFailedCode = 500
nsRequest byte = 0x00
nsAttempt byte = 0x01
)
func Default(location string) (*Worker, error) {
path := filepath.Join(location, "blobs")
err := os.MkdirAll(path, 0755)
if err != nil {
return nil, err
}
valid, err := regexp.Compile("^[a-zA-Z0-9-]+$")
if err != nil {
return nil, err
}
storage := fsblob.NewCheck(path, func(id string) bool {
return valid.MatchString(id)
})
taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue"))
if err != nil {
return nil, err
}
requeue, err := microqueue.NewMicroQueue(filepath.Join(location, "requeue"))
if err != nil {
return nil, err
}
metaStorage, err := micrometa.NewMetaStorage(filepath.Join(location, "meta"))
if err != nil {
return nil, err
}
cleanup := func() {
_ = requeue.Close()
_ = taskQueue.Close()
_ = metaStorage.Close()
}
wrk, err := New(taskQueue, requeue, storage, metaStorage)
if err != nil {
cleanup()
return nil, err
}
wrk.cleanup = cleanup
return wrk, nil
}
func New(tasks, requeue queue.Queue, blobs blob.Blob, meta meta.Meta) (*Worker, error) {
wrk := &Worker{
queue: tasks,
requeue: requeue,
blob: blobs,
meta: meta,
reloadMeta: make(chan struct{}, 1),
maxAttempts: defaultAttempts,
interval: defaultInterval,
concurrency: runtime.NumCPU(),
handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusNoContent)
}),
}
err := wrk.init()
if err != nil {
return nil, err
}
return wrk, nil
}
type Worker struct {
queue queue.Queue
requeue queue.Queue
blob blob.Blob
meta meta.Meta
handler http.Handler
cleanup func()
onDead CompleteHandler
onSuccess CompleteHandler
onProcess ProcessHandler
maxAttempts int
concurrency int
reloadMeta chan struct{}
interval time.Duration
sequence uint64
trackers sync.Map // id -> *Tracker
}
func (mgr *Worker) init() error {
return mgr.meta.Iterate(func(id string, record meta.Request) error {
if _, v, err := decodeID(id); err == nil && v > mgr.sequence {
mgr.sequence = v
} else if err != nil {
log.Println("found broken id:", id, "-", err)
}
if !record.Complete {
log.Println("found incomplete job", id)
return mgr.queue.Push([]byte(id))
}
return nil
})
}
// Cleanup internal resource.
func (mgr *Worker) Close() {
if fn := mgr.cleanup; fn != nil {
fn()
}
}
// Enqueue request to storage, save meta-info to meta storage and push id into processing queue. Generated ID
// always unique and returns only in case of successful enqueue.
func (mgr *Worker) Enqueue(req *http.Request) (string, error) {
id, err := mgr.saveRequest(req)
if err != nil {
return "", err
}
log.Println("new request saved:", id)
err = mgr.queue.Push([]byte(id))
return id, err
}
// Enqueue request to storage, save meta-info to meta storage and push id into processing queue. Generated ID
// always unique and returns only in case of successful enqueue. Returns Tracker to understand job processing.
// Tracking jobs is not free operation! Do not use it just because you can.
func (mgr *Worker) EnqueueWithTracker(req *http.Request) (*Tracker, error) {
id, err := mgr.saveRequest(req)
if err != nil {
return nil, err
}
track := newTracker(id)
log.Println("new request saved:", id)
mgr.trackers.Store(id, track)
err = mgr.queue.Push([]byte(id))
if err != nil {
track.close()
mgr.trackers.Delete(id)
return nil, err
}
return track, nil
}
// Complete request manually.
func (mgr *Worker) Complete(requestID string) error {
err := mgr.meta.Complete(requestID)
if err != nil {
return err
}
select {
case mgr.reloadMeta <- struct{}{}:
default:
}
return nil
}
func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker {
mgr.onSuccess = handler
return mgr
}
func (mgr *Worker) OnDead(handler CompleteHandler) *Worker {
mgr.onDead = handler
return mgr
}
func (mgr *Worker) OnProcess(handler ProcessHandler) *Worker {
mgr.onProcess = handler
return mgr
}
func (mgr *Worker) Handler(handler http.Handler) *Worker {
mgr.handler = handler
return mgr
}
func (mgr *Worker) HandlerFunc(fn http.HandlerFunc) *Worker {
mgr.handler = fn
return mgr
}
// Attempts number of 500x requests.
func (mgr *Worker) Attempts(max int) *Worker {
mgr.maxAttempts = max
return mgr
}
// Interval between attempts.
func (mgr *Worker) Interval(duration time.Duration) *Worker {
mgr.interval = duration
return mgr
}
// Concurrency limit (number of parallel tasks). Does not affect already running worker.
// 0 means num CPU.
func (mgr *Worker) Concurrency(num int) *Worker {
mgr.concurrency = num
if num == 0 {
mgr.concurrency = runtime.NumCPU()
}
return mgr
}
// Meta information about requests.
func (mgr *Worker) Meta() meta.Meta {
return mgr.meta
}
// Blobs storage (for large objects).
func (mgr *Worker) Blobs() blob.Blob {
return mgr.blob
}
func (mgr *Worker) Run(global context.Context) error {
if mgr.interval < 0 {
return fmt.Errorf("negative interval")
}
if mgr.maxAttempts < 0 {
return fmt.Errorf("negative attempts")
}
if mgr.handler == nil {
return fmt.Errorf("nil handler")
}
if mgr.concurrency <= 0 {
return fmt.Errorf("invalid concurrency number")
}
ctx, cancel := context.WithCancel(global)
defer cancel()
var wg sync.WaitGroup
for i := 0; i < mgr.concurrency; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
defer cancel()
err := mgr.runQueue(ctx)
if err != nil {
log.Println("worker", i, "stopped due to error:", err)
} else {
log.Println("worker", i, "stopped")
}
}(i)
}
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
err := mgr.runReQueue(ctx)
if err != nil {
log.Println("re-queue process stopped due to error:", err)
} else {
log.Println("re-queue process stopped")
}
}()
wg.Wait()
return ctx.Err()
}
// Retry processing.
func (mgr *Worker) Retry(ctx context.Context, requestID string) (string, error) {
info, err := mgr.meta.Get(requestID)
if err != nil {
return "", err
}
req, err := mgr.restoreRequest(ctx, requestID, info)
if err != nil {
return "", err
}
defer req.Body.Close()
return mgr.Enqueue(req)
}
func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error {
// caller should ensure that request id is valid
req, err := mgr.restoreRequest(ctx, requestID, info)
if err != nil {
return err
}
defer req.Body.Close()
attemptID := encodeID(nsAttempt, uint64(len(info.Attempts))+1)
req.Header.Set("X-Correlation-Id", requestID)
req.Header.Set("X-Attempt-Id", attemptID)
req.Header.Set("X-Attempt", strconv.Itoa(len(info.Attempts)+1))
var header meta.AttemptHeader
err = mgr.blob.Push(attemptID, func(out io.Writer) error {
res := openResponse(out)
started := time.Now()
mgr.handler.ServeHTTP(res, req)
header = res.meta
header.StartedAt = started
return nil
})
if err != nil {
return err
}
info, err = mgr.meta.AddAttempt(requestID, attemptID, header)
if err != nil {
return err
}
mgr.requestProcessed(ctx, requestID, attemptID, info)
if header.Code >= minimalFailedCode {
return fmt.Errorf("500 code returned: %d", header.Code)
}
return nil
}
func (mgr *Worker) runQueue(ctx context.Context) error {
for {
err := mgr.processQueueItem(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}
func (mgr *Worker) runReQueue(ctx context.Context) error {
for {
err := mgr.processReQueueItem(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
}
func (mgr *Worker) processQueueItem(ctx context.Context) error {
bid, err := mgr.queue.Get(ctx)
if err != nil {
return err
}
id := string(bid)
log.Println("processing request", id)
info, err := mgr.meta.Get(id)
if err != nil {
return fmt.Errorf("get request %s meta info: %w", id, err)
}
if info.Complete {
log.Printf("request %s already complete", id)
return nil
}
err = mgr.call(ctx, id, info)
if err == nil {
mgr.requestSuccess(ctx, id, info)
return nil
}
return mgr.requeueItem(ctx, id, info)
}
func (mgr *Worker) processReQueueItem(ctx context.Context) error {
var item requeueItem
data, err := mgr.requeue.Get(ctx)
if err != nil {
return err
}
err = json.Unmarshal(data, &item)
if err != nil {
return err
}
d := time.Since(item.At)
if d < mgr.interval {
var ok = false
for !ok {
info, err := mgr.meta.Get(item.ID)
if err != nil {
return fmt.Errorf("re-queue: get meta %s: %w", item.ID, err)
}
if info.Complete {
log.Printf("re-queue: %s already complete", item.ID)
return nil
}
select {
case <-time.After(mgr.interval - d):
ok = true
case <-mgr.reloadMeta:
case <-ctx.Done():
return ctx.Err()
}
}
}
return mgr.queue.Push([]byte(item.ID))
}
func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Request) error {
if len(info.Attempts) >= mgr.maxAttempts {
mgr.requestDead(ctx, id, info)
log.Println("maximum attempts reached for request", id)
return nil
}
data, err := json.Marshal(requeueItem{
At: time.Now(),
ID: id,
})
if err != nil {
return err
}
return mgr.requeue.Push(data)
}
func (mgr *Worker) saveRequest(req *http.Request) (string, error) {
id := encodeID(nsRequest, atomic.AddUint64(&mgr.sequence, 1))
err := mgr.blob.Push(id, func(out io.Writer) error {
_, err := io.Copy(out, req.Body)
return err
})
if err != nil {
return "", err
}
return id, mgr.meta.CreateRequest(id, req.Header, req.URL.RequestURI(), req.Method)
}
func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Request) {
err := mgr.meta.Complete(id)
if err != nil {
log.Println("failed complete (dead) request:", err)
}
if handler := mgr.onDead; handler != nil {
handler(ctx, id, info)
}
log.Println("request", id, "completely failed")
mgr.completeTrack(id, "", true)
}
func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) {
err := mgr.meta.Complete(id)
if err != nil {
log.Println("failed complete (success) request:", err)
}
if handler := mgr.onSuccess; handler != nil {
handler(ctx, id, info)
}
log.Println("request", id, "complete successfully")
}
func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID string, info *meta.Request) {
if handler := mgr.onProcess; handler != nil {
handler(ctx, id, attemptID, info)
}
log.Println("request", id, "processed with attempt", attemptID)
mgr.completeTrack(id, attemptID, info.Success())
}
func (mgr *Worker) restoreRequest(ctx context.Context, requestID string, info *meta.Request) (*http.Request, error) {
f, err := mgr.blob.Get(requestID)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f)
if err != nil {
_ = f.Close()
return nil, err
}
for k, v := range info.Headers {
req.Header[k] = v
}
return req, nil
}
func (mgr *Worker) completeTrack(id string, attemptID string, failed bool) {
track, ok := mgr.trackers.LoadAndDelete(id)
if !ok {
return
}
tracker := track.(*Tracker)
if failed {
tracker.failed()
} else {
tracker.ok(attemptID)
}
}
func encodeID(nsID byte, id uint64) string {
var data [9]byte
data[0] = nsID
binary.BigEndian.PutUint64(data[1:], id)
return strings.ToUpper(hex.EncodeToString(data[:]))
}
func decodeID(val string) (byte, uint64, error) {
const idLen = 1 + 8
hx, err := hex.DecodeString(val)
if err != nil {
return 0, 0, err
}
if len(hx) != idLen {
return 0, 0, errors.New("too short")
}
n := binary.BigEndian.Uint64(hx[1:])
return hx[0], n, nil
}
type requeueItem struct {
At time.Time
ID string
}