From 53e1acdbe89b3d0f18c4d3d9dfbe3fffb39ed226 Mon Sep 17 00:00:00 2001 From: Alexander Baryshnikov Date: Mon, 28 Sep 2020 23:36:20 +0800 Subject: [PATCH] use sequential ids --- .../meta/micrometa/request_meta_storage.go | 4 +++- worker/worker.go | 22 ++++++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/services/meta/micrometa/request_meta_storage.go b/services/meta/micrometa/request_meta_storage.go index edfbdc4..0e26062 100644 --- a/services/meta/micrometa/request_meta_storage.go +++ b/services/meta/micrometa/request_meta_storage.go @@ -98,7 +98,9 @@ func (rms *MicroMeta) Close() error { func (rms *MicroMeta) Iterate(handler func(id string, record meta.Request) error) error { return rms.db.View(func(txn *badger.Txn) error { - iter := txn.NewIterator(badger.DefaultIteratorOptions) + cfg := badger.DefaultIteratorOptions + cfg.Reverse = true + iter := txn.NewIterator(cfg) iter.Rewind() defer iter.Close() for iter.Valid() { diff --git a/worker/worker.go b/worker/worker.go index 7e90eef..56a2ef6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -9,13 +9,14 @@ import ( "net/http" "os" "path/filepath" + "regexp" "runtime" "strconv" + "strings" "sync" + "sync/atomic" "time" - "github.com/google/uuid" - "nano-run/services/blob" "nano-run/services/blob/fsblob" "nano-run/services/meta" @@ -41,9 +42,12 @@ func Default(location string) (*Worker, error) { if err != nil { return nil, err } + valid, err := regexp.Compile("^[a-zA-Z0-9-]+$") + if err != nil { + return nil, err + } storage := fsblob.NewCheck(path, func(id string) bool { - _, err := uuid.Parse(id) - return err == nil + return valid.MatchString(id) }) taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue")) @@ -110,10 +114,16 @@ type Worker struct { concurrency int reloadMeta chan struct{} interval time.Duration + sequence int64 } func (mgr *Worker) init() error { return mgr.meta.Iterate(func(id string, record meta.Request) error { + if v, err := strconv.ParseInt(strings.TrimLeft(id, "0"), 10, 64); err == nil && v > mgr.sequence { + mgr.sequence = v + } else if err != nil { + log.Println("found broken id:", id, "-", err) + } if !record.Complete { log.Println("found incomplete job", id) return mgr.queue.Push([]byte(id)) @@ -276,7 +286,7 @@ func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Reques return err } defer req.Body.Close() - attemptID := uuid.New().String() + attemptID := fmt.Sprintf("%016d", len(info.Attempts)+1) req.Header.Set("X-Correlation-Id", requestID) req.Header.Set("X-Attempt-Id", attemptID) @@ -411,7 +421,7 @@ func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Reques } func (mgr *Worker) saveRequest(req *http.Request) (string, error) { - id := uuid.New().String() + id := fmt.Sprintf("%016d", atomic.AddInt64(&mgr.sequence, 1)) err := mgr.blob.Push(id, func(out io.Writer) error { _, err := io.Copy(out, req.Body) return err