use sequential ids
This commit is contained in:
parent
4698e52dca
commit
53e1acdbe8
@ -98,7 +98,9 @@ func (rms *MicroMeta) Close() error {
|
||||
|
||||
func (rms *MicroMeta) Iterate(handler func(id string, record meta.Request) error) error {
|
||||
return rms.db.View(func(txn *badger.Txn) error {
|
||||
iter := txn.NewIterator(badger.DefaultIteratorOptions)
|
||||
cfg := badger.DefaultIteratorOptions
|
||||
cfg.Reverse = true
|
||||
iter := txn.NewIterator(cfg)
|
||||
iter.Rewind()
|
||||
defer iter.Close()
|
||||
for iter.Valid() {
|
||||
|
@ -9,13 +9,14 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"nano-run/services/blob"
|
||||
"nano-run/services/blob/fsblob"
|
||||
"nano-run/services/meta"
|
||||
@ -41,9 +42,12 @@ func Default(location string) (*Worker, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
valid, err := regexp.Compile("^[a-zA-Z0-9-]+$")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
storage := fsblob.NewCheck(path, func(id string) bool {
|
||||
_, err := uuid.Parse(id)
|
||||
return err == nil
|
||||
return valid.MatchString(id)
|
||||
})
|
||||
|
||||
taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue"))
|
||||
@ -110,10 +114,16 @@ type Worker struct {
|
||||
concurrency int
|
||||
reloadMeta chan struct{}
|
||||
interval time.Duration
|
||||
sequence int64
|
||||
}
|
||||
|
||||
func (mgr *Worker) init() error {
|
||||
return mgr.meta.Iterate(func(id string, record meta.Request) error {
|
||||
if v, err := strconv.ParseInt(strings.TrimLeft(id, "0"), 10, 64); err == nil && v > mgr.sequence {
|
||||
mgr.sequence = v
|
||||
} else if err != nil {
|
||||
log.Println("found broken id:", id, "-", err)
|
||||
}
|
||||
if !record.Complete {
|
||||
log.Println("found incomplete job", id)
|
||||
return mgr.queue.Push([]byte(id))
|
||||
@ -276,7 +286,7 @@ func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Reques
|
||||
return err
|
||||
}
|
||||
defer req.Body.Close()
|
||||
attemptID := uuid.New().String()
|
||||
attemptID := fmt.Sprintf("%016d", len(info.Attempts)+1)
|
||||
|
||||
req.Header.Set("X-Correlation-Id", requestID)
|
||||
req.Header.Set("X-Attempt-Id", attemptID)
|
||||
@ -411,7 +421,7 @@ func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Reques
|
||||
}
|
||||
|
||||
func (mgr *Worker) saveRequest(req *http.Request) (string, error) {
|
||||
id := uuid.New().String()
|
||||
id := fmt.Sprintf("%016d", atomic.AddInt64(&mgr.sequence, 1))
|
||||
err := mgr.blob.Push(id, func(out io.Writer) error {
|
||||
_, err := io.Copy(out, req.Body)
|
||||
return err
|
||||
|
Loading…
x
Reference in New Issue
Block a user