diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..47f3bdf --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +Dockerfile +/.idea +/_docs +/build +/dist +/run +/conf.d \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e6e702d --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.idea +/run +/dist +/build \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..0c23a10 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,41 @@ +run: + tests: false +linters: + disable-all: false + enable: + - bodyclose + - deadcode + - depguard + - dogsled + - dupl + - errcheck + - exhaustive + - funlen + - gochecknoinits + - goconst + - gocyclo + - gofmt + - goimports + - golint + - gomnd + - goprintffuncname + - gosec + - gosimple + - govet + - ineffassign + - interfacer + - misspell + - nakedret + - noctx + - nolintlint + - rowserrcheck + - scopelint + - staticcheck + - structcheck + - stylecheck + - typecheck + - unconvert + - unparam + - unused + - varcheck + - whitespace \ No newline at end of file diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..a35efaa --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,71 @@ +project_name: nano-run +builds: + - env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + goarch: + - amd64 + - arm64 + - arm + flags: + - -trimpath + main: ./cmd/nano-run + +nfpms: + - id: debian + file_name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}" + replacements: + Linux: linux + 386: i386 + homepage: https://github.com/reddec/nano-run + maintainer: Baryshnikov Aleksandr + description: Lightweigt runner for web requests + license: Apache-2.0 + formats: + - deb + scripts: + postinstall: "bundle/debian/postinstall.sh" + preremove: "bundle/debian/preremove.sh" + empty_folders: + - /etc/nano-run/conf.d + config_files: + "bundle/debian/server.yaml": "/etc/nano-run/server.yaml" +uploads: + - name: bintray + method: PUT + mode: archive + username: reddec + custom_artifact_name: true + ids: + - debian + target: 'https://api.bintray.com/content/reddec/debian/{{ .ProjectName }}/{{ .Version }}/{{ .ArtifactName }};publish=1;deb_component=main;deb_distribution=all;deb_architecture={{ .Arch }}' +dockers: + - binaries: + - nano-run + dockerfile: Dockerfile + extra_files: + - bundle/docker/server.yaml + build_flag_templates: + - "--label=org.opencontainers.image.created={{.Date}}" + - "--label=org.opencontainers.image.title={{.ProjectName}}" + - "--label=org.opencontainers.image.revision={{.FullCommit}}" + - "--label=org.opencontainers.image.version={{.Version}}" + image_templates: + - "reddec/nano-run:{{ .Tag }}" + - "reddec/nano-run:v{{ .Major }}" + - "reddec/nano-run:v{{ .Major }}.{{ .Minor }}" + - "reddec/nano-run:latest" +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ .Tag }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' + - '^build:' diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d4c0e1a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:3.12 +VOLUME /data +VOLUME /conf.d +EXPOSE 80 +COPY nano-run /bin/nano-run +COPY bundle/docker/server.yaml /server.yaml +CMD ["/bin/nano-run", "server", "run", "-f", "-c", "server.yaml"] \ No newline at end of file diff --git a/Dockerfile.build b/Dockerfile.build new file mode 100644 index 0000000..16d4ed4 --- /dev/null +++ b/Dockerfile.build @@ -0,0 +1,13 @@ +FROM golang:1.15-alpine3.12 AS build +WORKDIR /go/src/app +COPY . . + +RUN go get -d -v ./... +RUN go build -o nano-run -v ./cmd/nano-run/... + +FROM alpine:3.12 +VOLUME /data +VOLUME /conf.d +COPY docker/server.yaml /server.yaml +COPY --from=build /go/src/app/nano-run /bin/nano-run +CMD ["/bin/nano-run", "server", "run", "-f", "-c", "server.yaml"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d8d641c --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# Nano-Run + +Lightweight async request runner. + +A simplified version of [trusted-cgi](https://github.com/reddec/trusted-cgi) designed +for async processing extreme amount of requests. + +## Goals + +* Minimal requirements for host; +* Should have semi-constant resource consumption regardless of: + * number of requests, + * size of requests, + * kind of requests; +* Should be ready to run without configuration; +* Should be ready for deploying in clouds; +* Should support extending for another providers; +* Can be used as library and as a complete solution; +* **Performance (throughput/latency) has less priority** than resource usage. \ No newline at end of file diff --git a/_docs/api.md b/_docs/api.md new file mode 100644 index 0000000..b7ba39d --- /dev/null +++ b/_docs/api.md @@ -0,0 +1,114 @@ +# API + +Base url: `/{application}` + + +## Start process + +||| +|------------|------| +| **Method** | POST | +| **Path** | / | + +Saves and enqueue request. Return 303 See Other on success with headers. + +* `X-Correlation-Id` - with ID of request +* `Location` - URL to status + +Clients can use cUrl flag `-L` to automatically follow redirect + + curl -L -d '' 'http://127.0.0.1:8989/app/' + +## Get status + + +||| +|------------|------------------| +| **Method** | GET | +| **Path** | /{correlationId} | + +Returns JSON with full meta-data of request and following headers: + +* `Content-Version` - number of attempts +* `Last-Modified` - latest of time of creation, time of last attempt or completion time +* `Location` - URL to the complete attempt +* `X-Status` - status of request processing: `complete` or `processing` +* `X-Last-Attempt` - id of last attempt +* `X-Last-Attempt-At` - time of last attempt +* `X-Correlation-Id` - with ID of request + +Body example: + +```json +{ + "created_at": "2020-09-10T17:11:33.598542177+08:00", + "complete_at": "2020-09-10T17:11:33.616550544+08:00", + "attempts": [ + { + "code": 200, + "headers": {}, + "id": "51748767-e89b-48a1-8b00-b9c1f0fdc9bb", + "created_at": "2020-09-10T17:11:33.614030236+08:00" + } + ], + "headers": { + "Accept": [ + "*/*" + ], + "Content-Length": [ + "0" + ], + "Content-Type": [ + "application/x-www-form-urlencoded" + ], + "User-Agent": [ + "curl/7.68.0" + ] + }, + "uri": "/date/", + "method": "POST", + "complete": true +} +``` + +## Get complete + +||| +|------------|---------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/complete | + +Will redirect to the complete attempt or 404 + + + +## Get attempt + +||| +|------------|--------------------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/attempt/{attemptId} | + +Get result of processing request for the defined attempt + +Returns body, code and headers same as processor returned with additional headers: + +* `X-Status` - status of request processing: `complete` or `processing` +* `X-Processed` - `true` to distinguish result + + + +## Get request + +||| +|------------|--------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/request | + +Get request same as it was POSTed for the defined ID + +Returns body and headers same as processor got from client with additional headers: + +* `X-Method` - request method (currently always `POST`) +* `X-Request-Uri` - request URI +* `Last-Modified` - time of creation diff --git a/_docs/authorization.md b/_docs/authorization.md new file mode 100644 index 0000000..70a1476 --- /dev/null +++ b/_docs/authorization.md @@ -0,0 +1,116 @@ +# Authorization + +By-default - authorization disabled. Multiple policies allowed. +To allow request at least one policy should be passed. +Each authorization policy can enabled by `enable: yes` param. + +Section in `server.yaml`: `authorization` + +## JWT + +*section: `authorization.jwt`* + +[Overview](https://jwt.io/) + +HMAC 256 signature validation against secret key + +Configurable parameters: + +* `header` (optional, string, default: `Authorization`) - header that contains JWT +* `secret` (required, string) - secret key to validate signature + +Example minimal unit config + +```yaml +command: 'echo hello world' +authorization: + jwt: + enable: yes + secret: '$eCrEtKey' +``` + +## Query token + +*section: `authorization.query_token`* + +Plain token in a query string. Will be matched against list of allowed tokens. + +For example, client can invoke endpoint by addition token query: `http://example.com/app/?token=deadbeaf` + +Configurable parameters: + +* `param` (optional, string, default: `token`) - query param where token should be placed +* `tokens` (required, []string) - list of allowed tokens + +Example minimal unit config with 3 tokens + +```yaml +command: 'echo hello world' +authorization: + query_token: + enable: yes + tokens: + - my-token-1 + - his-token-2 + - deadbeaf +``` + +## Header token + +*section: `authorization.header_token`* + +Plain token in a header. Will be matched against list of allowed tokens. + +For example, client can invoke endpoint by curl: + + curl -H 'X-Api-Token: deadbeaf' http://example.com/app/ + +Configurable parameters: + +* `header` (optional, string, default: `X-Api-Token`) - header name where token should be placed +* `tokens` (required, []string) - list of allowed tokens + +Example minimal unit config with 3 tokens + +```yaml +command: 'echo hello world' +authorization: + header_token: + enable: yes + tokens: + - my-token-1 + - his-token-2 + - deadbeaf +``` + +## Basic + +*section: `authorization.basic`* + +Basic authentication. [Overview](https://en.wikipedia.org/wiki/Basic_access_authentication) + +For example, client can invoke endpoint by curl: + + curl -u 'alice:admin' http://example.com/app/ + +To [calculate](https://unix.stackexchange.com/a/419855) hash you may use `htpasswd` (Debian/Ubuntu: `sudo apt install apache2-utils`) + + htpasswd -bnBC 10 "" password | tr -d ':' + +where `passsword` is a desired password for the user. + +Configurable parameters: + +* `users` (string->string, required) - map of users and their hashed password by bcrypt + +Example minimal config: + +```yaml +command: 'echo hello world' +authorization: + basic: + enable: yes + users: + alice: '$2y$10$cUe3n8NHaxee.AaGzT8wF.nirPnjv5YLEQGTsLiiMiUAknM2aF2FS' + bob: '$2y$10$iSczi.MlKTrMv3h0Zf.GDeW1NS6ZWxBgtj4ytrKKDrR2s2wIxq5Qa' +``` diff --git a/_docs/docker.md b/_docs/docker.md new file mode 100644 index 0000000..df3f985 --- /dev/null +++ b/_docs/docker.md @@ -0,0 +1,38 @@ +# Docker + + +Check images in [releases](https://github.com/reddec/nano-run/releases) + +* Latest one: `reddec/nano-run:latest` + + +Create Dockerfile inherited from the image and copy configuration and binaries + +## Minimal example + +**app.yaml** +```yaml +command: '/mybinary --with --some args' +``` + +**Dockerfile** +```dockerfile +FROM reddec/nano-run +COPY app.yaml /conf.d/app.yaml +COPY mybinary /mybinary +``` + +**Build & Run** + +```bash +docker run --rm -p 127.0.0.1:8080:80 $(docker build -q .) +``` + +Check it's working by sending test request + +``` +curl -v -X POST "http://127.0.0.1:8080/app/" +``` + +* To keep tasks persistent - mount `/data` volume like: +`docker run -v $(pwd)/data:data ...` \ No newline at end of file diff --git a/_docs/flow.md b/_docs/flow.md new file mode 100644 index 0000000..902cac2 --- /dev/null +++ b/_docs/flow.md @@ -0,0 +1,14 @@ +# High-level overview + +Subjects: + +* Client - the side which is making HTTP(S) request to the System +* System - instance of nano-run that routing request +* Worker - executable that implements business logic + +During restart - all incomplete tasks will queued again. + +![image](https://user-images.githubusercontent.com/6597086/92712138-d8b58580-f38b-11ea-8a26-251df5c4ae13.png) + +![image](https://user-images.githubusercontent.com/6597086/92578247-3085bb00-f2be-11ea-87de-e2c9d94a21fa.png) + diff --git a/_docs/flow.txt b/_docs/flow.txt new file mode 100644 index 0000000..dbb19c1 --- /dev/null +++ b/_docs/flow.txt @@ -0,0 +1,19 @@ +title Request processing + +Client->System: HTTP(s) request +System->System: Save request +System->Client: Correlation ID +loop attempts +System->Worker: Request +alt request failed +Worker->System: failed +System->System: requeue +else success +Worker->System: success +end +System->System: save attemp +end + +System->System: mark request as complete +Client->System: get result +System->Client: info \ No newline at end of file diff --git a/_docs/goals.md b/_docs/goals.md new file mode 100644 index 0000000..2aa9b08 --- /dev/null +++ b/_docs/goals.md @@ -0,0 +1,12 @@ +# Goals + +* Minimal requirements for host; +* Should have semi-constant resource consumption regardless of: + * number of requests, + * size of requests, + * kind of requests; +* Should be ready to run without configuration; +* Should be ready for deploying in clouds; +* Should support extending for another providers; +* Can be used as library and as a complete solution; +* Performance has less priority than resource usage. \ No newline at end of file diff --git a/_docs/internals/worker_flow b/_docs/internals/worker_flow new file mode 100644 index 0000000..10c0a2a --- /dev/null +++ b/_docs/internals/worker_flow @@ -0,0 +1 @@ +7Vzdc5s4EP9rPNN76A3f4MdzmjQP6U2veWjvUQbZpgZEhRzb99efBMJ8CNtyEiNIM9OZWgJ9sLu/3dXuKhPzJt59xiBdfUEBjCaGFuwm5qeJYeiWYUzYPy3YFz2uNS06ljgM+EtVx2P4H+SdGu/dhAHMGi8ShCISps1OHyUJ9EmjD2CMts3XFihqrpqCJRQ6Hn0Qib3fw4Csil7PcKv+exguV+XKusO/Lwbly/xLshUI0LbWZd5OzBuMECl+xbsbGDHilXQpxt0deXrYGIYJkRnw/Uv02VqvH2ffluv7qf3wz0OQfHStYponEG34F2P48dcG0halVkQ3XGye7EuKZNswjkBCW7OMYLSGNyhCOH9mBs7csR32hL90F0ZR+TxB+aBFrYsOWSwWhu/T/gBkK8j2qvPGV0AIxEneY2iUXjO+V4gJ3B2lgn6gLRVKiGJI8J6+wgfYFmcHl0ezbG8r7poly1Y1ztqlRAIuUcvD3IflvlEJBMkygtV6ltlcz9HF9aYdyxlmczUQMVoAAmdokwRZndP0R+1Lq66c/xfIgmN2yEIpChkBlOqBIA2YbefAtu0qJPAxBT57uqUagfatSBzxx1L8OyGnR7naInIHT/UunjonWNqg7sWktAVSpigfRkC2ZsjCKKb/ldT9UILujxME1oZDYF09hQ2BUjCgmps3ESYrtEQJiG6r3lmTltU7DwilnII/ISF7bobAhqAmfeEuJD/Y8D9t3vq39uTTjs+cN/a8UeyTbe45HKFfiDbYhxK4pQhdwlMzFlIp8hjDCJDwqbm/1+eYK2Ai2/g+zOhCd6LYr1A832QKRd5uivzBjtdE3uuQeO9qEm8JRPptJd6WlXhXqcR7Jw0qStPBGlTdcwZmUR2BlglSCQi9BocKHN2AoKzA+xqKWPMAI9aohuWt/oDkygLJUwqkqcD8LQhJfp7D+S5+bULMwKSRMIbj8KG6Th79YsrVBLLuYTYWK3MMVNp4QDVVCSrXVMrpZ6nPBp8rtqvitKvLctpUymldwDlMSj+EMogFq8KkOo6OQXse4irqtOdoTqDD1ZRTSfy4R4J6PeGnw/1AeA2Z61GESLUPOX4oZUAU5RHwvO2jxN9gSi1/z12TTAzw1OKpEiHQVvQ0sKEXWF2hWM+Ym47zOkHTFvY6TsJWVwxzal0Le7p4GjhwRHWk8ohRL6eZWk0vULUa08WwzKlQpSLbcBlRdfVUnY7FODxbyZdfeE7Jl1g9q+R1pcEaQzwMUZ0KQczPlzAjFRgygjBLHI4DDqZyOBhiailMnqjBYhsASRAxzT0KWlq6rZqWo4kD9+53yqqkkl9nVZKh9NxmiKa5RI0T0W3P5hQ1zpL9UpxJOY0hQ29iqCsH3msqxRrQ0U2ThFBPoQ9ZCJmyoQ+1Vt0UQx8xwOu6TXdAzICQzLM0J1wbWj6K0wiSoRr7dn1Jx+GsX/tkibZedVT5gljj9W2PbHTYVBrzMA2Bixl4Yv4aC0nEKRkoHNr+mt1ha3r217zhGJuR+mulMJ7319QaG7UZlbfAaUO2lqkQiRdwmg/9isKE1FxVu3Xcc+3mFMUX8FGVvJydyJ629EvxhcJEr1XMabljsbjHXFxDvY8rW2b0Yh83H/oXxmBfeyFlspEdl1W96fl5U+2kRLZe1y27JXfF+q8qhaZYB1vLPBbxZZaAbPvctRr0McWdPUe1s2GJBH93vmvyKBuLVut8W0MrdnsTvoU885XWSZsi82OwmzA10laT/CiWdSlQ4Oep5DHEA92OSrhe44GmGGj9ezR4uzpu5CMWjkrc2GICT7XpGxIXLU+Si7XLiSpMn1gzP6rchqE8t/FeedAubTkr8pbS6nZbzFHUipnU3hI5k3to3ZFSXmdgi7mH91si/ZkOW+mpyRPLNO+P1Ja8qOZy4fkwv7Eu1FzOPduyNVnEXXZRvSOv13lP3dSuBS5PdLCqwiiAMWWwMkXlnbHS7rD0lCeqfJ5dqyg6R8Geh8e0eYTmE+VlZ5cRWVeeiS7vML5dV8iTVeElds+qcE+tCjeO4YLOD9joZIFKUPCuUYGi63jQMyjsd1C0tPB5UBhKQSE6tVUSpbIXdI4B3eY6AwvdGZyxUFpWe8k54fq4MGRxobQ81hOjtj7CxdooKRGBIdngZLCeaRsI6l3TAZUrydbG9nNelsePbBGLd6VyJdqs/q5ckcuv/jqfefs/ \ No newline at end of file diff --git a/_docs/unit.md b/_docs/unit.md new file mode 100644 index 0000000..da58894 --- /dev/null +++ b/_docs/unit.md @@ -0,0 +1,3 @@ +# Unit + +* If work dir is not defined - temporary directory will be created and removed after execution for each request automatically. \ No newline at end of file diff --git a/bundle/debian/nano-run.service b/bundle/debian/nano-run.service new file mode 100644 index 0000000..c94df6d --- /dev/null +++ b/bundle/debian/nano-run.service @@ -0,0 +1,12 @@ +[Unit] +Description=Lightweight async request processor + +[Service] +ExecStart=/usr/bin/nano-run server run -c /etc/nano-run/server.yaml +Restart=always +RestartSec=3 +User=nano-run +Group=nano-run + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/bundle/debian/postinstall.sh b/bundle/debian/postinstall.sh new file mode 100755 index 0000000..ecca49f --- /dev/null +++ b/bundle/debian/postinstall.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +SERVICE="nano-run" +RUNNING_USER="${SERVICE}" + +if ! id -u ${RUNNING_USER}; then + echo "Creating user ${RUNNING_USER}..." + useradd -M -c "${RUNNING_USER} dummy user" -r -s /bin/nologin ${RUNNING_USER} +fi + +systemctl enable "${SERVICE}".service || echo "failed to enable service" +systemctl start "${SERVICE}".service || echo "failed to start service" diff --git a/bundle/debian/preremove.sh b/bundle/debian/preremove.sh new file mode 100755 index 0000000..7987f5f --- /dev/null +++ b/bundle/debian/preremove.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +SERVICE="nano-run" +RUNNING_USER="${SERVICE}" + +systemctl stop "${SERVICE}".service || echo "failed to stop service" +systemctl disable "${SERVICE}".service || echo "failed to disable service" + +if id -u ${RUNNING_USER}; then + echo "Removing user ${RUNNING_USER}..." + userdel -r ${RUNNING_USER} +fi + + diff --git a/bundle/debian/server.yaml b/bundle/debian/server.yaml new file mode 100755 index 0000000..72c01a4 --- /dev/null +++ b/bundle/debian/server.yaml @@ -0,0 +1,6 @@ +# Location to store tasks, blobs and queues +working_directory: /var/nano-run +config_directory: /etc/nano-run/conf.d +bind: 127.0.0.1:8989 +graceful_shutdown: 5s +max_request: 1048576 diff --git a/bundle/docker/server.yaml b/bundle/docker/server.yaml new file mode 100644 index 0000000..430daef --- /dev/null +++ b/bundle/docker/server.yaml @@ -0,0 +1,5 @@ +working_directory: /data +config_directory: /conf.d +bind: ":80" +graceful_shutdown: 5s +max_request: 1048576 diff --git a/cmd/nano-run/main.go b/cmd/nano-run/main.go new file mode 100644 index 0000000..c209b48 --- /dev/null +++ b/cmd/nano-run/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "os" + "os/signal" + + "github.com/jessevdk/go-flags" +) + +var ( + version = "dev" + commit = "dev" +) + +type Config struct { + Run runCmd `command:"run" description:"run single unit"` + Server serverCmd `command:"server" description:"manage server"` +} + +func main() { + if len(os.Args) == 1 { + os.Args = append(os.Args, "server", "run") + } + var cfg Config + parser := flags.NewParser(&cfg, flags.Default) + parser.LongDescription = "Async webhook processor with minimal system requirements.\n\n" + + "Author: Baryshnikov Aleksandr \n" + + "Source code: https://github.com/reddec/nano-run\n" + + "License: Apache 2.0\n" + + "Version: " + version + "\n" + + "Revision: " + commit + _, err := parser.Parse() + if err != nil { + os.Exit(1) + } +} + +func SignalContext() context.Context { + gctx, closer := context.WithCancel(context.Background()) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, signals...) + for range c { + closer() + break + } + }() + return gctx +} diff --git a/cmd/nano-run/run_cmd.go b/cmd/nano-run/run_cmd.go new file mode 100644 index 0000000..c542439 --- /dev/null +++ b/cmd/nano-run/run_cmd.go @@ -0,0 +1,65 @@ +package main + +import ( + "io/ioutil" + "os" + "path/filepath" + runtime "runtime" + "strconv" + "strings" + "time" + + "nano-run/server" +) + +type runCmd struct { + Directory string `long:"directory" short:"d" env:"DIRECTORY" description:"Data directory" default:"run"` + Interval time.Duration `long:"interval" short:"i" env:"INTERVAL" description:"Requeue interval" default:"3s"` + Attempts int `long:"attempts" short:"a" env:"ATTEMPTS" description:"Max number of attempts" default:"5"` + Concurrency int `long:"concurrency" short:"c" env:"CONCURRENCY" description:"Number of parallel worker (0 - mean number of CPU)" default:"0"` + Mode string `long:"mode" short:"m" env:"MODE" description:"Running mode" default:"bin" choice:"bin" choice:"cgi" choice:"proxy"` + Bind string `long:"bind" short:"b" env:"BIND" description:"Binding address" default:"127.0.0.1:8989"` + Args struct { + Executable string `arg:"executable" description:"path to binary to invoke or url" required:"yes"` + Args []string `arg:"args" description:"executable args"` + } `positional-args:"yes"` +} + +func (cfg *runCmd) Execute([]string) error { + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + srv := server.DefaultConfig() + srv.Bind = cfg.Bind + srv.WorkingDirectory = cfg.Directory + srv.ConfigDirectory = tmpDir + + unit := server.DefaultUnit() + + var params []string + params = append(params, strconv.Quote(cfg.Args.Executable)) + for _, arg := range cfg.Args.Args { + params = append(params, strconv.Quote(arg)) + } + unit.Command = strings.Join(params, " ") + unit.WorkDir, _ = os.Getwd() + unit.Attempts = cfg.Attempts + unit.Interval = cfg.Interval + unit.Workers = cfg.concurrency() + unit.Mode = cfg.Mode + + err = unit.SaveFile(filepath.Join(tmpDir, "main.yaml")) + if err != nil { + return err + } + return srv.Run(SignalContext()) +} + +func (cfg runCmd) concurrency() int { + if cfg.Concurrency <= 0 { + return runtime.NumCPU() + } + return cfg.Concurrency +} diff --git a/cmd/nano-run/server_cmd.go b/cmd/nano-run/server_cmd.go new file mode 100644 index 0000000..3eb8c6b --- /dev/null +++ b/cmd/nano-run/server_cmd.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "os" + "path/filepath" + + "nano-run/server" +) + +type serverCmd struct { + Run serverRunCmd `command:"run" description:"run server"` + Init serverInitCmd `command:"init" description:"initialize server"` +} + +type serverInitCmd struct { + Directory string `short:"d" long:"directory" env:"DIRECTORY" description:"Target directory" default:"server"` + ConfigFile string `long:"config-file" env:"CONFIG_FILE" description:"Config file name" default:"server.yaml"` + NoSample bool `long:"no-sample" env:"NO_SAMPLE" description:"Do not create same file"` +} + +func (cmd *serverInitCmd) Execute([]string) error { + err := os.MkdirAll(cmd.Directory, 0755) + if err != nil { + return err + } + cfg := server.DefaultConfig() + err = cfg.SaveFile(filepath.Join(cmd.Directory, cmd.ConfigFile)) + if err != nil { + return err + } + err = os.MkdirAll(filepath.Join(cmd.Directory, cfg.ConfigDirectory), 0755) + if err != nil { + return err + } + err = os.MkdirAll(filepath.Join(cmd.Directory, cfg.WorkingDirectory), 0755) + if err != nil { + return err + } + if !cmd.NoSample { + unit := server.DefaultUnit() + err = unit.SaveFile(filepath.Join(cmd.Directory, cfg.ConfigDirectory, "sample.yaml")) + if err != nil { + return err + } + } + return nil +} + +type serverRunCmd struct { + Fail bool `short:"f" long:"fail" env:"FAIL" description:"Fail if no config file"` + Config string `short:"c" long:"config" env:"CONFIG" description:"Configuration file" default:"server.yaml"` +} + +func (cmd *serverRunCmd) Execute([]string) error { + cfg := server.DefaultConfig() + err := cfg.LoadFile(cmd.Config) + if os.IsNotExist(err) && !cmd.Fail { + log.Println("no config file found - using transient default configuration") + cfg.ConfigDirectory = filepath.Join("run", "conf.d") + cfg.WorkingDirectory = filepath.Join("run", "data") + err := cfg.CreateDirs() + if err != nil { + return err + } + } else if err != nil { + return err + } + log.Println("configuration loaded") + return cfg.Run(SignalContext()) +} diff --git a/cmd/nano-run/signals_default.go b/cmd/nano-run/signals_default.go new file mode 100644 index 0000000..7b68308 --- /dev/null +++ b/cmd/nano-run/signals_default.go @@ -0,0 +1,9 @@ +// +build !darwin,!linux + +package main + +import ( + "os" +) + +var signals = []os.Signal{os.Interrupt} diff --git a/cmd/nano-run/signals_posix.go b/cmd/nano-run/signals_posix.go new file mode 100644 index 0000000..058898c --- /dev/null +++ b/cmd/nano-run/signals_posix.go @@ -0,0 +1,10 @@ +// +build linux darwin + +package main + +import ( + "os" + "syscall" +) + +var signals = []os.Signal{syscall.SIGTERM, os.Interrupt} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4363e62 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module nano-run + +go 1.14 + +require ( + github.com/dgraph-io/badger v1.6.1 + github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/google/uuid v1.1.2 + github.com/gorilla/mux v1.8.0 + github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7 + golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a + gopkg.in/yaml.v2 v2.3.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d356f2c --- /dev/null +++ b/go.sum @@ -0,0 +1,86 @@ +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.6.1 h1:w9pSFNSdq/JPM1N12Fz/F/bzo993Is1W+Q7HjPzi7yg= +github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7 h1:Ug59miTxVKVg5Oi2S5uHlKOIV5jBx4Hb2u0jIxxDaSs= +github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/nano_logger.go b/internal/nano_logger.go new file mode 100644 index 0000000..0fe7df9 --- /dev/null +++ b/internal/nano_logger.go @@ -0,0 +1,31 @@ +package internal + +import ( + "log" + + "github.com/dgraph-io/badger" +) + +func NanoLogger(wrap *log.Logger) badger.Logger { + return &nanoLogger{logger: wrap} +} + +type nanoLogger struct { + logger *log.Logger +} + +func (nl *nanoLogger) Errorf(s string, i ...interface{}) { + nl.logger.Printf("[error] "+s, i...) +} + +func (nl *nanoLogger) Warningf(s string, i ...interface{}) { + nl.logger.Printf("[warn] "+s, i...) +} + +func (nl *nanoLogger) Infof(s string, i ...interface{}) { + nl.logger.Printf("[info] "+s, i...) +} + +func (nl *nanoLogger) Debugf(s string, i ...interface{}) { + nl.logger.Printf("[debug] "+s, i...) +} diff --git a/server/auth.go b/server/auth.go new file mode 100644 index 0000000..9ed6e9c --- /dev/null +++ b/server/auth.go @@ -0,0 +1,129 @@ +package server + +import ( + "errors" + "net/http" + + "github.com/dgrijalva/jwt-go" + "golang.org/x/crypto/bcrypt" +) + +func (cfg Unit) enableAuthorization() func(handler http.Handler) http.Handler { + var handlers []AuthHandlerFunc + if cfg.Authorization.JWT.Enable { + handlers = append(handlers, cfg.Authorization.JWT.Create()) + } + if cfg.Authorization.QueryToken.Enable { + handlers = append(handlers, cfg.Authorization.QueryToken.Create()) + } + if cfg.Authorization.HeaderToken.Enable { + handlers = append(handlers, cfg.Authorization.HeaderToken.Create()) + } + if cfg.Authorization.Basic.Enable { + handlers = append(handlers, cfg.Authorization.Basic.Create()) + } + if len(handlers) == 0 { + return func(handler http.Handler) http.Handler { + return handler + } + } + return func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + var authorized bool + for _, h := range handlers { + if h(request) { + authorized = true + break + } + } + if !authorized { + writer.WriteHeader(http.StatusForbidden) + return + } + handler.ServeHTTP(writer, request) + }) + } +} + +type AuthHandlerFunc func(req *http.Request) bool + +type JWT struct { + Header string `yaml:"header"` // JWT header - by default Authorization + Secret string `yaml:"secret"` // key to verify JWT +} + +func (cfg JWT) Create() AuthHandlerFunc { + header := cfg.Header + if header == "" { + header = "Authorization" + } + + return func(req *http.Request) bool { + rawToken := req.Header.Get(header) + t, err := jwt.Parse(rawToken, func(token *jwt.Token) (interface{}, error) { + if token.Method != jwt.SigningMethodHS256 { + return nil, errors.New("unknown method") + } + return []byte(cfg.Secret), nil + }) + return err == nil && t.Valid + } +} + +type QueryToken struct { + Param string `yaml:"param"` // query name - by default 'token' + Tokens []string `yaml:"tokens"` // allowed tokens +} + +func (cfg QueryToken) Create() AuthHandlerFunc { + param := cfg.Param + if param == "" { + param = "token" + } + tokens := map[string]bool{} + for _, k := range cfg.Tokens { + tokens[k] = true + } + return func(req *http.Request) bool { + token := req.URL.Query().Get(param) + return tokens[token] + } +} + +type HeaderToken struct { + Header string `yaml:"header"` // header name - by default X-Api-Token + Tokens []string `yaml:"tokens"` // allowed tokens +} + +func (cfg HeaderToken) Create() AuthHandlerFunc { + header := cfg.Header + if header == "" { + header = "X-Api-Token" + } + tokens := map[string]bool{} + for _, k := range cfg.Tokens { + tokens[k] = true + } + return func(req *http.Request) bool { + token := req.URL.Query().Get(header) + return tokens[token] + } +} + +type Basic struct { + Users map[string]string `yaml:"users"` // users -> bcrypted password map +} + +func (cfg Basic) Create() AuthHandlerFunc { + return func(req *http.Request) bool { + u, p, ok := req.BasicAuth() + if !ok { + return false + } + h, ok := cfg.Users[u] + if !ok { + return false + } + return bcrypt.CompareHashAndPassword([]byte(h), []byte(p)) == nil + } +} diff --git a/server/internal/adapter.go b/server/internal/adapter.go new file mode 100644 index 0000000..7d18312 --- /dev/null +++ b/server/internal/adapter.go @@ -0,0 +1,192 @@ +package internal + +import ( + "encoding/json" + "io" + "log" + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + + "nano-run/services/meta" + "nano-run/worker" +) + +// Expose worker as HTTP handler: +// POST / - post task async, returns 303 See Other and location. +// GET /:id - get task info. +// GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete) +// GET /:id/attempt/:atid - get attempt result (as-is). +// GET /:id/request - replay request (as-is). +func Expose(router *mux.Router, wrk *worker.Worker) { + //TODO: wait + router.Path("/").Methods("POST").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + id, err := wrk.Enqueue(request) + if err != nil { + log.Println("failed to enqueue:", err) + http.Error(writer, "failed to enqueue", http.StatusInternalServerError) + return + } + writer.Header().Set("X-Correlation-Id", id) + http.Redirect(writer, request, id, http.StatusSeeOther) + }) + // get state: 200 with json description. For complete request - Location header will be filled. + router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk)) + router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk)) + // get attempt result as-is. + router.Path("/{id}/attempt/{attemptId}").Methods("GET").HandlerFunc(getAttempt(wrk)) + // get recorded request. + router.Path("/{id}/request").Methods("GET").HandlerFunc(getRequest(wrk)) +} + +// get request meta information. +func createTask(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + data, err := json.Marshal(info) + if err != nil { + log.Println("failed to encode info:", err) + http.Error(writer, "encoding", http.StatusInternalServerError) + return + } + writer.Header().Set("Content-Type", "application/json") + writer.Header().Set("Content-Length", strconv.Itoa(len(data))) + writer.Header().Set("Content-Version", strconv.Itoa(len(info.Attempts))) + // modification time + setLastModify(writer, info) + + writer.Header().Set("Age", strconv.FormatInt(int64(time.Since(info.CreatedAt)/time.Second), 10)) + if info.Complete { + writer.Header().Set("X-Status", "complete") + } else { + writer.Header().Set("X-Status", "processing") + } + + if len(info.Attempts) > 0 { + writer.Header().Set("X-Last-Attempt", info.Attempts[len(info.Attempts)-1].ID) + writer.Header().Set("X-Last-Attempt-At", info.Attempts[len(info.Attempts)-1].CreatedAt.Format(time.RFC850)) + } + + if info.Complete { + lastAttempt := info.Attempts[len(info.Attempts)-1] + request.URL.Path += "/attempt/" + lastAttempt.ID + writer.Header().Set("Location", request.URL.String()) + } + writer.WriteHeader(http.StatusOK) + _, _ = writer.Write(data) + } +} + +func getComplete(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + if !info.Complete { + http.NotFound(writer, request) + return + } + lastAttempt := info.Attempts[len(info.Attempts)-1] + http.Redirect(writer, request, "attempt/"+lastAttempt.ID, http.StatusMovedPermanently) + } +} + +func getAttempt(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + attemptID := params["attemptId"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + var attempt meta.Attempt + var found bool + for _, atp := range info.Attempts { + if atp.ID == attemptID { + found = true + attempt = atp + break + } + } + if !found { + http.NotFound(writer, request) + return + } + body, err := wrk.Blobs().Get(attempt.ID) + if err != nil { + log.Println("failed to get body:", err) + http.Error(writer, "get body", http.StatusInternalServerError) + return + } + defer body.Close() + writer.Header().Set("Last-Modified", attempt.CreatedAt.Format(time.RFC850)) + if info.Complete { + writer.Header().Set("X-Status", "complete") + } else { + writer.Header().Set("X-Status", "processing") + } + writer.Header().Set("X-Processed", "true") + for k, v := range attempt.Headers { + writer.Header()[k] = v + } + writer.WriteHeader(attempt.Code) + _, _ = io.Copy(writer, body) + } +} + +func getRequest(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + writer.Header().Set("Last-Modified", info.CreatedAt.Format(time.RFC850)) + f, err := wrk.Blobs().Get(requestID) + if err != nil { + log.Println("failed to get data:", err) + http.Error(writer, "data", http.StatusInternalServerError) + return + } + defer f.Close() + + writer.Header().Set("X-Method", info.Method) + writer.Header().Set("X-Request-Uri", info.URI) + + for k, v := range info.Headers { + writer.Header()[k] = v + } + writer.WriteHeader(http.StatusOK) + _, _ = io.Copy(writer, f) + } +} + +func setLastModify(writer http.ResponseWriter, info *meta.Request) { + if info.Complete { + writer.Header().Set("Last-Modified", info.CompleteAt.Format(time.RFC850)) + } else if len(info.Attempts) > 0 { + writer.Header().Set("Last-Modified", info.Attempts[len(info.Attempts)-1].CreatedAt.Format(time.RFC850)) + } else { + writer.Header().Set("Last-Modified", info.CreatedAt.Format(time.RFC850)) + } +} diff --git a/server/internal/flags.go b/server/internal/flags.go new file mode 100644 index 0000000..7278639 --- /dev/null +++ b/server/internal/flags.go @@ -0,0 +1,9 @@ +// +build !linux + +package internal + +import "os/exec" + +func SetBinFlags(cmd *exec.Cmd) { + +} diff --git a/server/internal/flags_linux.go b/server/internal/flags_linux.go new file mode 100644 index 0000000..5587dab --- /dev/null +++ b/server/internal/flags_linux.go @@ -0,0 +1,14 @@ +package internal + +import ( + "os/exec" + "syscall" +) + +func SetBinFlags(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Pdeathsig = syscall.SIGTERM + cmd.SysProcAttr.Setpgid = true +} diff --git a/server/mode_bin.go b/server/mode_bin.go new file mode 100644 index 0000000..ce467c2 --- /dev/null +++ b/server/mode_bin.go @@ -0,0 +1,96 @@ +package server + +import ( + "context" + "io/ioutil" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "nano-run/server/internal" +) + +type markerResponse struct { + dataSent bool + res http.ResponseWriter +} + +func (m *markerResponse) Header() http.Header { + return m.res.Header() +} + +func (m *markerResponse) Write(bytes []byte) (int, error) { + m.dataSent = true + return m.res.Write(bytes) +} + +func (m *markerResponse) WriteHeader(statusCode int) { + m.dataSent = true + m.res.WriteHeader(statusCode) +} + +type binHandler struct { + command string + workDir string + shell string + environment []string + timeout time.Duration +} + +func (bh *binHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + marker := &markerResponse{res: writer} + + ctx := request.Context() + if bh.timeout > 0 { + c, cancel := context.WithTimeout(ctx, bh.timeout) + defer cancel() + ctx = c + } + + cmd := exec.CommandContext(ctx, bh.shell, "-c", bh.command) //nolint:gosec + + if bh.workDir == "" { + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + defer os.RemoveAll(tmpDir) + cmd.Dir = tmpDir + } else { + cmd.Dir = bh.workDir + } + + var env = bh.cloneEnv() + for k, v := range request.Header { + ke := strings.ToUpper(strings.Replace(k, "-", "_", -1)) + env = append(env, ke+"="+strings.Join(v, ",")) + } + + cmd.Stderr = os.Stderr + cmd.Stdin = request.Body + cmd.Stdout = marker + cmd.Env = env + internal.SetBinFlags(cmd) + err := cmd.Run() + + if marker.dataSent { + return + } + if err != nil { + writer.Header().Set("X-Return-Code", strconv.Itoa(cmd.ProcessState.ExitCode())) + writer.WriteHeader(http.StatusBadGateway) + } else { + writer.Header().Set("X-Return-Code", strconv.Itoa(cmd.ProcessState.ExitCode())) + writer.WriteHeader(http.StatusNoContent) + } +} + +func (bh *binHandler) cloneEnv() []string { + var cp = make([]string, len(bh.environment)) + copy(cp, bh.environment) + return cp +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..4e5d463 --- /dev/null +++ b/server/server.go @@ -0,0 +1,144 @@ +package server + +import ( + "context" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + "gopkg.in/yaml.v2" +) + +type Config struct { + WorkingDirectory string `yaml:"working_directory"` + ConfigDirectory string `yaml:"config_directory"` + Bind string `yaml:"bind"` + GracefulShutdown time.Duration `yaml:"graceful_shutdown"` + TLS struct { + Enable bool `yaml:"enable"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + } `yaml:"tls,omitempty"` +} + +const ( + defaultGracefulShutdown = 5 * time.Second + defaultBind = "127.0.0.1:8989" +) + +func DefaultConfig() Config { + var cfg Config + cfg.Bind = defaultBind + cfg.WorkingDirectory = filepath.Join("run") + cfg.ConfigDirectory = filepath.Join("conf.d") + cfg.GracefulShutdown = defaultGracefulShutdown + return cfg +} + +func (cfg Config) CreateDirs() error { + err := os.MkdirAll(cfg.WorkingDirectory, 0755) + if err != nil { + return err + } + return os.MkdirAll(cfg.ConfigDirectory, 0755) +} + +func (cfg *Config) LoadFile(file string) error { + data, err := ioutil.ReadFile(file) + if err != nil { + return err + } + err = yaml.Unmarshal(data, cfg) + if err != nil { + return err + } + if !filepath.IsAbs(cfg.WorkingDirectory) { + cfg.WorkingDirectory = filepath.Join(filepath.Dir(file), cfg.WorkingDirectory) + } + if !filepath.IsAbs(cfg.ConfigDirectory) { + cfg.ConfigDirectory = filepath.Join(filepath.Dir(file), cfg.ConfigDirectory) + } + return nil +} + +func (cfg Config) SaveFile(file string) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return ioutil.WriteFile(file, data, 0600) +} + +func (cfg Config) Run(global context.Context) error { + units, err := Units(cfg.ConfigDirectory) + if err != nil { + return err + } + workers, err := Workers(cfg.WorkingDirectory, units) + if err != nil { + return err + } + defer func() { + for _, wrk := range workers { + wrk.Close() + } + }() + handler := Handler(units, workers) + + ctx, cancel := context.WithCancel(global) + + server := http.Server{ + Addr: cfg.Bind, + Handler: handler, + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + <-ctx.Done() + t, c := context.WithTimeout(context.Background(), cfg.GracefulShutdown) + _ = server.Shutdown(t) + c() + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + err := Run(ctx, workers) + if err != nil { + log.Println("workers stopped:", err) + } + }() + if cfg.TLS.Enable { + err = server.ListenAndServeTLS(cfg.TLS.Cert, cfg.TLS.Key) + } else { + err = server.ListenAndServe() + } + cancel() + wg.Wait() + return err +} + +func limitRequest(maxSize int64, handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + body := request.Body + defer body.Close() + if request.ContentLength > maxSize { + http.Error(writer, "too big request", http.StatusBadRequest) + return + } + + limiter := io.LimitReader(request.Body, maxSize) + request.Body = ioutil.NopCloser(limiter) + handler.ServeHTTP(writer, request) + }) +} diff --git a/server/unit.go b/server/unit.go new file mode 100644 index 0000000..ee035c2 --- /dev/null +++ b/server/unit.go @@ -0,0 +1,265 @@ +package server + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/cgi" //nolint:gosec + "net/http/httputil" + "net/url" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "github.com/gorilla/mux" + "gopkg.in/yaml.v2" + + "nano-run/server/internal" + "nano-run/worker" +) + +type Unit struct { + Interval time.Duration `yaml:"interval,omitempty"` // interval between attempts + Attempts int `yaml:"attempts,omitempty"` // maximum number of attempts + Workers int `yaml:"workers,omitempty"` // concurrency level - number of parallel requests + Mode string `yaml:"mode,omitempty"` // execution mode: bin, cgi or proxy + WorkDir string `yaml:"workdir,omitempty"` // working directory for the worker. if empty - temporary one will generated automatically + Command string `yaml:"command"` // command in a shell to execute + Timeout time.Duration `yaml:"timeout,omitempty"` // maximum execution timeout (enabled only for bin mode and only if positive) + Shell string `yaml:"shell,omitempty"` // shell to execute command in bin mode (default - /bin/sh) + Environment map[string]string `yaml:"environment,omitempty"` // custom environment for executable (in addition to system) + MaxRequest int64 `yaml:"max_request,omitempty"` // optional maximum HTTP body size (enabled if positive) + Authorization struct { + JWT struct { + Enable bool `yaml:"enable"` // enable JWT verification + JWT `yaml:",inline"` + } `yaml:"jwt,omitempty"` // HMAC256 JWT verification with shared secret + + QueryToken struct { + Enable bool `yaml:"enable"` // enable query-based token access + QueryToken `yaml:",inline"` + } `yaml:"query_token,omitempty"` // plain API tokens in request query params + + HeaderToken struct { + Enable bool `yaml:"enable"` // enable header-based token access + HeaderToken `yaml:",inline"` + } `yaml:"header_token,omitempty"` // plain API tokens in request header + + Basic struct { + Enable bool `yaml:"enable"` // enable basic verification + Basic `yaml:",inline"` + } `yaml:"basic,omitempty"` // basic authorization + } `yaml:"authorization,omitempty"` + name string +} + +const ( + defaultRequestSize = 1 * 1024 * 1024 // 1MB + defaultAttempts = 3 + defaultInterval = 5 * time.Second + defaultWorkers = 1 + defaultShell = "/bin/sh" + defaultMode = "bin" + defaultCommand = "echo hello world" + defaultName = "main" +) + +func DefaultUnit() Unit { + return Unit{ + Interval: defaultInterval, + Attempts: defaultAttempts, + Workers: defaultWorkers, + MaxRequest: defaultRequestSize, + Shell: defaultShell, + Mode: defaultMode, + Command: defaultCommand, + name: defaultName, + } +} + +func (cfg Unit) Validate() error { + var checks []string + if cfg.Interval < 0 { + checks = append(checks, "negative interval") + } + if cfg.Attempts < 0 { + checks = append(checks, "negative attempts") + } + if cfg.Workers < 0 { + checks = append(checks, "negative workers amount") + } + if !(cfg.Mode == "bin" || cfg.Mode == "cgi" || cfg.Mode == "proxy") { + checks = append(checks, "unknown mode "+cfg.Mode) + } + if len(checks) == 0 { + return nil + } + return errors.New(strings.Join(checks, ", ")) +} + +func (cfg Unit) SaveFile(file string) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return ioutil.WriteFile(file, data, 0600) +} + +func Units(configsDir string) ([]Unit, error) { + var configs []Unit + err := filepath.Walk(configsDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + name := info.Name() + if !(strings.HasSuffix(name, ".yaml") || strings.HasSuffix(name, ".yml")) { + return nil + } + unitName := strings.ReplaceAll(strings.Trim(path[len(configsDir):strings.LastIndex(path, ".")], "/\\"), string(filepath.Separator), "-") + cfg := DefaultUnit() + cfg.name = unitName + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + err = yaml.Unmarshal(data, &cfg) + if err != nil { + return err + } + configs = append(configs, cfg) + return nil + }) + return configs, err +} + +func Workers(workdir string, configurations []Unit) ([]*worker.Worker, error) { + var ans []*worker.Worker + for _, cfg := range configurations { + log.Println("validating", cfg.name) + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("configuration invalid for %s: %w", cfg.name, err) + } + if cfg.Workers == 0 { + cfg.Workers = runtime.NumCPU() + } + wrk, err := cfg.worker(workdir) + if err != nil { + for _, w := range ans { + w.Close() + } + return nil, err + } + ans = append(ans, wrk) + } + return ans, nil +} + +func Handler(units []Unit, workers []*worker.Worker) http.Handler { + router := mux.NewRouter() + for i, unit := range units { + prefix := "/" + unit.name + "/" + subRouter := router.PathPrefix(prefix).Subrouter() + subRouter.Use(unit.enableAuthorization()) + internal.Expose(subRouter, workers[i]) + } + return router +} + +func Run(global context.Context, workers []*worker.Worker) error { + if len(workers) == 0 { + <-global.Done() + return global.Err() + } + + ctx, cancel := context.WithCancel(global) + defer cancel() + var wg sync.WaitGroup + + for _, wrk := range workers { + wg.Add(1) + go func(wrk *worker.Worker) { + err := wrk.Run(ctx) + if err != nil { + log.Println("failed:", err) + } + wg.Done() + }(wrk) + } + + wg.Wait() + return ctx.Err() +} + +func (cfg Unit) worker(root string) (*worker.Worker, error) { + handler, err := cfg.handler() + if err != nil { + return nil, err + } + workdir := filepath.Join(root, cfg.name) + wrk, err := worker.Default(workdir) + if err != nil { + return nil, err + } + wrk = wrk.Attempts(cfg.Attempts).Interval(cfg.Interval).Concurrency(cfg.Workers).Handler(handler) + return wrk, nil +} + +func (cfg Unit) handler() (http.Handler, error) { + handler, err := cfg.createRunner() + if err != nil { + return nil, err + } + if cfg.MaxRequest > 0 { + handler = limitRequest(cfg.MaxRequest, handler) + } + //TODO: add authorization + return handler, nil +} + +func (cfg Unit) createRunner() (http.Handler, error) { + switch cfg.Mode { + case "bin": + return &binHandler{ + command: cfg.Command, + workDir: cfg.WorkDir, + shell: cfg.Shell, + timeout: cfg.Timeout, + environment: append(os.Environ(), makeEnvList(cfg.Environment)...), + }, nil + case "cgi": + return &cgi.Handler{ + Path: cfg.Shell, + Dir: cfg.WorkDir, + Env: append(os.Environ(), makeEnvList(cfg.Environment)...), + Logger: log.New(os.Stderr, "[cgi] ", log.LstdFlags), + Args: []string{"-c", cfg.Command}, + Stderr: os.Stderr, + }, nil + case "proxy": + // proxy to static URL + u, err := url.Parse(cfg.Command) + if err != nil { + return nil, err + } + return httputil.NewSingleHostReverseProxy(u), nil + default: + return nil, fmt.Errorf("unknown mode %s", cfg.Mode) + } +} + +func makeEnvList(content map[string]string) []string { + var ans = make([]string, 0, len(content)) + for k, v := range content { + ans = append(ans, k+"="+v) + } + return ans +} diff --git a/services/blob/fsblob/impl.go b/services/blob/fsblob/impl.go new file mode 100644 index 0000000..ce89899 --- /dev/null +++ b/services/blob/fsblob/impl.go @@ -0,0 +1,80 @@ +package fsblob + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "nano-run/services/blob" +) + +// Dummy file storage: large storage based on file system. +// ID - just name of file, but content will be written atomically. +func New(rootDir string) blob.Blob { + return &fsBlob{ + rootDir: rootDir, + checker: func(id string) bool { + return true + }, + } +} + +func NewCheck(rootDir string, checkFn func(string) bool) blob.Blob { + return &fsBlob{ + rootDir: rootDir, + checker: checkFn, + } +} + +type fsBlob struct { + rootDir string + checker func(id string) bool +} + +func (k *fsBlob) Push(id string, handler func(out io.Writer) error) error { + if !k.checker(id) { + return fmt.Errorf("push: invalid id %s", id) + } + dir := filepath.Join(k.rootDir, id[0:1]) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + tempFile := filepath.Join(dir, id+".temp") + destFile := filepath.Join(dir, id) + + tempF, err := os.Create(tempFile) + if err != nil { + return fmt.Errorf("fsblob: put data: create temp file: %w", err) + } + err = handler(tempF) + flushErr := tempF.Close() + if err != nil { + _ = os.Remove(tempFile) + return err + } + if flushErr != nil { + _ = os.Remove(tempFile) + return fmt.Errorf("fsblob: put data: flush content to temp file: %w", err) + } + + err = os.Rename(tempFile, destFile) + if err != nil { + return fmt.Errorf("fsblob: put data: commit file: %w", err) + } + return nil +} + +func (k *fsBlob) Get(id string) (io.ReadCloser, error) { + if !k.checker(id) { + return nil, fmt.Errorf("get: invalid id %s", id) + } + dir := filepath.Join(k.rootDir, id[0:1]) + destFile := filepath.Join(dir, id) + f, err := os.Open(destFile) + if err == nil { + return f, nil + } + return nil, fmt.Errorf("fsblob: get file: %w", err) +} diff --git a/services/blob/interface.go b/services/blob/interface.go new file mode 100644 index 0000000..6efc49d --- /dev/null +++ b/services/blob/interface.go @@ -0,0 +1,11 @@ +package blob + +import "io" + +// Large (more then memory) content storage. +type Blob interface { + // Push content to the storage using provided writer. + Push(id string, handler func(out io.Writer) error) error + // Get content from the storage. + Get(id string) (io.ReadCloser, error) +} diff --git a/services/meta/interface.go b/services/meta/interface.go new file mode 100644 index 0000000..0300f36 --- /dev/null +++ b/services/meta/interface.go @@ -0,0 +1,35 @@ +package meta + +import ( + "net/http" + "time" +) + +type Meta interface { + Get(requestID string) (*Request, error) + CreateRequest(requestID string, headers http.Header, uri string, method string) error + AddAttempt(requestID, attemptID string, header AttemptHeader) (*Request, error) + Complete(requestID string) error + Iterate(handler func(id string, record Request) error) error +} + +type AttemptHeader struct { + Code int `json:"code"` + Headers http.Header `json:"headers"` +} + +type Attempt struct { + AttemptHeader + ID string `json:"id"` + CreatedAt time.Time `json:"created_at"` +} + +type Request struct { + CreatedAt time.Time `json:"created_at"` + CompleteAt time.Time `json:"complete_at,omitempty"` + Attempts []Attempt `json:"attempts"` + Headers http.Header `json:"headers"` + URI string `json:"uri"` + Method string `json:"method"` + Complete bool `json:"complete"` +} diff --git a/services/meta/micrometa/request_meta_storage.go b/services/meta/micrometa/request_meta_storage.go new file mode 100644 index 0000000..edfbdc4 --- /dev/null +++ b/services/meta/micrometa/request_meta_storage.go @@ -0,0 +1,148 @@ +package micrometa + +import ( + "encoding/json" + "log" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/dgraph-io/badger" + + "nano-run/internal" + "nano-run/services/meta" +) + +func WrapMetaStorage(db *badger.DB) *MicroMeta { + return &MicroMeta{db: db, wrapped: true} +} + +func NewMetaStorage(location string) (*MicroMeta, error) { + name := filepath.Base(location) + db, err := badger.Open(badger.DefaultOptions(location). + WithLogger(internal.NanoLogger(log.New(os.Stderr, "["+name+"] ", log.LstdFlags)))) + if err != nil { + return nil, err + } + return &MicroMeta{ + db: db, + wrapped: false, + }, nil +} + +type MicroMeta struct { + db *badger.DB + wrapped bool +} + +func (rms *MicroMeta) Get(requestID string) (*meta.Request, error) { + var ans meta.Request + return &ans, rms.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(requestID)) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &ans) + }) + }) +} + +func (rms *MicroMeta) CreateRequest(requestID string, headers http.Header, uri string, method string) error { + var record = meta.Request{ + CreatedAt: time.Now(), + Attempts: make([]meta.Attempt, 0), + Complete: false, + Headers: headers, + URI: uri, + Method: method, + } + data, err := json.Marshal(record) + if err != nil { + return err + } + return rms.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(requestID), data) + }) +} + +func (rms *MicroMeta) AddAttempt(requestID, attemptID string, header meta.AttemptHeader) (*meta.Request, error) { + var ans meta.Request + err := rms.updateRequest(requestID, func(record *meta.Request) error { + record.Attempts = append(record.Attempts, meta.Attempt{ + ID: attemptID, + CreatedAt: time.Now(), + AttemptHeader: header, + }) + ans = *record + return nil + }) + return &ans, err +} + +func (rms *MicroMeta) Complete(requestID string) error { + return rms.updateRequest(requestID, func(record *meta.Request) error { + record.Complete = true + record.CompleteAt = time.Now() + return nil + }) +} + +func (rms *MicroMeta) Close() error { + if rms.wrapped { + return nil + } + return rms.db.Close() +} + +func (rms *MicroMeta) Iterate(handler func(id string, record meta.Request) error) error { + return rms.db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.DefaultIteratorOptions) + iter.Rewind() + defer iter.Close() + for iter.Valid() { + id := string(iter.Item().Key()) + var rec meta.Request + err := iter.Item().Value(func(val []byte) error { + return json.Unmarshal(val, &rec) + }) + if err != nil { + return err + } + err = handler(id, rec) + if err != nil { + return err + } + iter.Next() + } + return nil + }) +} + +func (rms *MicroMeta) updateRequest(requestID string, tx func(record *meta.Request) error) error { + return rms.db.Update(func(txn *badger.Txn) error { + data, err := txn.Get([]byte(requestID)) + if err != nil { + return err + } + var record meta.Request + err = data.Value(func(val []byte) error { + return json.Unmarshal(val, &record) + }) + if err != nil { + return err + } + + err = tx(&record) + if err != nil { + return err + } + + value, err := json.Marshal(record) + if err != nil { + return err + } + return txn.Set([]byte(requestID), value) + }) +} diff --git a/services/queue/interface.go b/services/queue/interface.go new file mode 100644 index 0000000..5e3bebc --- /dev/null +++ b/services/queue/interface.go @@ -0,0 +1,8 @@ +package queue + +import "context" + +type Queue interface { + Push(payload []byte) error + Get(ctx context.Context) ([]byte, error) +} diff --git a/services/queue/microqueue/micro_queue.go b/services/queue/microqueue/micro_queue.go new file mode 100644 index 0000000..d5cf71d --- /dev/null +++ b/services/queue/microqueue/micro_queue.go @@ -0,0 +1,123 @@ +package microqueue + +import ( + "context" + "encoding/binary" + "errors" + "log" + "os" + "path/filepath" + "sync/atomic" + + "github.com/dgraph-io/badger" + + "nano-run/internal" +) + +var ErrEmptyQueue = errors.New("empty queue") + +func WrapMicroQueue(db *badger.DB) (*MicroQueue, error) { + mc := &MicroQueue{db: db, wrapped: true, notify: make(chan struct{}, 1), close: make(chan struct{})} + return mc, mc.db.DropAll() +} + +func NewMicroQueue(location string) (*MicroQueue, error) { + name := filepath.Base(location) + db, err := badger.Open(badger.DefaultOptions(location).WithTruncate(true).WithLogger(internal.NanoLogger(log.New(os.Stderr, "["+name+"] ", log.LstdFlags)))) + if err != nil { + return nil, err + } + return &MicroQueue{db: db, notify: make(chan struct{}, 1), close: make(chan struct{})}, nil +} + +// Always fresh queue with offloading to fs if no readers. +// Optimized for multiple readers and multiple writers with number of items limited by FS +// and each value should fit to RAM. +type MicroQueue struct { + db *badger.DB + sequence uint64 + wrapped bool + notify chan struct{} + close chan struct{} +} + +func (mq *MicroQueue) Push(payload []byte) error { + id := atomic.AddUint64(&mq.sequence, 1) + var key [8]byte + binary.BigEndian.PutUint64(key[:], id) + err := mq.db.Update(func(txn *badger.Txn) error { + return txn.Set(key[:], payload) + }) + if err != nil { + return err + } + mq.sendNotify() + return nil +} + +func (mq *MicroQueue) pop() ([]byte, error) { + var ans []byte + err := mq.db.Update(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + PrefetchSize: 1, + Reverse: false, + AllVersions: false, + }) + defer it.Close() + it.Rewind() + + if !it.Valid() { + return ErrEmptyQueue + } + v, err := it.Item().ValueCopy(ans) + if err != nil { + return err + } + ans = v + + return txn.Delete(it.Item().Key()) + }) + if err != nil { + return nil, err + } + mq.sendNotify() + return ans, nil +} + +// Get blocking for new item in a queue. +func (mq *MicroQueue) Get(ctx context.Context) ([]byte, error) { + mq.sendNotify() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-mq.close: + return nil, errors.New("queue closed") + case <-mq.notify: + v, err := mq.pop() + if err == nil { + return v, nil + } + if errors.Is(err, ErrEmptyQueue) { + continue + } + return nil, err + } + } +} + +func (mq *MicroQueue) Close() error { + close(mq.close) + if mq.wrapped { + return nil + } + return mq.db.Close() +} + +func (mq *MicroQueue) sendNotify() { + select { + case mq.notify <- struct{}{}: + default: + } +} diff --git a/worker/request_io.go b/worker/request_io.go new file mode 100644 index 0000000..ad15acb --- /dev/null +++ b/worker/request_io.go @@ -0,0 +1,37 @@ +package worker + +import ( + "io" + "net/http" + + "nano-run/services/meta" +) + +func openResponse(writer io.Writer) *responseStream { + return &responseStream{stream: writer, meta: meta.AttemptHeader{Headers: make(http.Header)}} +} + +type responseStream struct { + meta meta.AttemptHeader + statusSent bool + stream io.Writer +} + +func (mo *responseStream) Header() http.Header { + return mo.meta.Headers +} + +func (mo *responseStream) Write(bytes []byte) (int, error) { + if !mo.statusSent { + mo.WriteHeader(http.StatusOK) + } + return mo.stream.Write(bytes) +} + +func (mo *responseStream) WriteHeader(statusCode int) { + if mo.statusSent { + return + } + mo.statusSent = true + mo.meta.Code = statusCode +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..2e90219 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,415 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/google/uuid" + + "nano-run/services/blob" + "nano-run/services/blob/fsblob" + "nano-run/services/meta" + "nano-run/services/meta/micrometa" + "nano-run/services/queue" + "nano-run/services/queue/microqueue" +) + +type ( + CompleteHandler func(ctx context.Context, requestID string, info *meta.Request) + ProcessHandler func(ctx context.Context, requestID, attemptID string, info *meta.Request) +) + +const ( + defaultAttempts = 3 + defaultInterval = 3 * time.Second + minimalFailedCode = 500 +) + +func Default(location string) (*Worker, error) { + path := filepath.Join(location, "blobs") + err := os.MkdirAll(path, 0755) + if err != nil { + return nil, err + } + storage := fsblob.NewCheck(path, func(id string) bool { + _, err := uuid.Parse(id) + return err == nil + }) + + taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue")) + if err != nil { + return nil, err + } + requeue, err := microqueue.NewMicroQueue(filepath.Join(location, "requeue")) + if err != nil { + return nil, err + } + metaStorage, err := micrometa.NewMetaStorage(filepath.Join(location, "meta")) + if err != nil { + return nil, err + } + + cleanup := func() { + _ = requeue.Close() + _ = taskQueue.Close() + _ = metaStorage.Close() + } + + wrk, err := New(taskQueue, requeue, storage, metaStorage) + if err != nil { + cleanup() + return nil, err + } + wrk.cleanup = cleanup + return wrk, nil +} + +func New(tasks, requeue queue.Queue, blobs blob.Blob, meta meta.Meta) (*Worker, error) { + wrk := &Worker{ + queue: tasks, + requeue: requeue, + blob: blobs, + meta: meta, + maxAttempts: defaultAttempts, + interval: defaultInterval, + concurrency: runtime.NumCPU(), + handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(http.StatusNoContent) + }), + } + err := wrk.init() + if err != nil { + return nil, err + } + return wrk, nil +} + +type Worker struct { + queue queue.Queue + requeue queue.Queue + blob blob.Blob + meta meta.Meta + handler http.Handler + cleanup func() + + onDead CompleteHandler + onSuccess CompleteHandler + onProcess ProcessHandler + maxAttempts int + concurrency int + interval time.Duration +} + +func (mgr *Worker) init() error { + return mgr.meta.Iterate(func(id string, record meta.Request) error { + if !record.Complete { + log.Println("found incomplete job", id) + return mgr.queue.Push([]byte(id)) + } + return nil + }) +} + +func (mgr *Worker) Close() { + if fn := mgr.cleanup; fn != nil { + fn() + } +} + +func (mgr *Worker) Enqueue(req *http.Request) (string, error) { + id, err := mgr.saveRequest(req) + if err != nil { + return "", err + } + log.Println("new request saved:", id) + err = mgr.queue.Push([]byte(id)) + return id, err +} + +func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker { + mgr.onSuccess = handler + return mgr +} + +func (mgr *Worker) OnDead(handler CompleteHandler) *Worker { + mgr.onDead = handler + return mgr +} + +func (mgr *Worker) OnProcess(handler ProcessHandler) *Worker { + mgr.onProcess = handler + return mgr +} + +func (mgr *Worker) Handler(handler http.Handler) *Worker { + mgr.handler = handler + return mgr +} + +func (mgr *Worker) HandlerFunc(fn http.HandlerFunc) *Worker { + mgr.handler = fn + return mgr +} + +// Attempts number of 500x requests. +func (mgr *Worker) Attempts(max int) *Worker { + mgr.maxAttempts = max + return mgr +} + +// Interval between attempts. +func (mgr *Worker) Interval(duration time.Duration) *Worker { + mgr.interval = duration + return mgr +} + +// Concurrency limit (number of parallel tasks). Does not affect already running worker. +// 0 means num CPU. +func (mgr *Worker) Concurrency(num int) *Worker { + mgr.concurrency = num + if num == 0 { + mgr.concurrency = runtime.NumCPU() + } + return mgr +} + +// Meta information about requests. +func (mgr *Worker) Meta() meta.Meta { + return mgr.meta +} + +// Blobs storage (for large objects). +func (mgr *Worker) Blobs() blob.Blob { + return mgr.blob +} + +func (mgr *Worker) Run(global context.Context) error { + if mgr.interval < 0 { + return fmt.Errorf("negative interval") + } + if mgr.maxAttempts < 0 { + return fmt.Errorf("negative attempts") + } + if mgr.handler == nil { + return fmt.Errorf("nil handler") + } + if mgr.concurrency <= 0 { + return fmt.Errorf("invalid concurrency number") + } + ctx, cancel := context.WithCancel(global) + defer cancel() + var wg sync.WaitGroup + for i := 0; i < mgr.concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + defer cancel() + err := mgr.runQueue(ctx) + if err != nil { + log.Println("worker", i, "stopped due to error:", err) + } else { + log.Println("worker", i, "stopped") + } + }(i) + } + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + err := mgr.runReQueue(ctx) + if err != nil { + log.Println("re-queue process stopped due to error:", err) + } else { + log.Println("re-queue process stopped") + } + }() + wg.Wait() + return ctx.Err() +} + +func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error { + // caller should ensure that request id is valid + f, err := mgr.blob.Get(requestID) + if err != nil { + return err + } + defer f.Close() + + req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f) + if err != nil { + return err + } + for k, v := range info.Headers { + req.Header[k] = v + } + + attemptID := uuid.New().String() + + var header meta.AttemptHeader + + err = mgr.blob.Push(attemptID, func(out io.Writer) error { + res := openResponse(out) + mgr.handler.ServeHTTP(res, req) + header = res.meta + return nil + }) + if err != nil { + return err + } + + info, err = mgr.meta.AddAttempt(requestID, attemptID, header) + if err != nil { + return err + } + + mgr.requestProcessed(ctx, requestID, attemptID, info) + if header.Code >= minimalFailedCode { + return fmt.Errorf("500 code returned: %d", header.Code) + } + return nil +} + +func (mgr *Worker) runQueue(ctx context.Context) error { + for { + err := mgr.processQueueItem(ctx) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } +} + +func (mgr *Worker) runReQueue(ctx context.Context) error { + for { + err := mgr.processReQueueItem(ctx) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } +} + +func (mgr *Worker) processQueueItem(ctx context.Context) error { + bid, err := mgr.queue.Get(ctx) + if err != nil { + return err + } + id := string(bid) + log.Println("processing request", id) + info, err := mgr.meta.Get(id) + if err != nil { + return fmt.Errorf("get request %s meta info: %w", id, err) + } + if info.Complete { + return fmt.Errorf("request %s already complete", id) + } + err = mgr.call(ctx, id, info) + if err == nil { + mgr.requestSuccess(ctx, id, info) + return nil + } + return mgr.requeueItem(ctx, id, info) +} + +func (mgr *Worker) processReQueueItem(ctx context.Context) error { + var item requeueItem + + data, err := mgr.requeue.Get(ctx) + if err != nil { + return err + } + err = json.Unmarshal(data, &item) + + if err != nil { + return err + } + + d := time.Since(item.At) + if d < mgr.interval { + select { + case <-time.After(mgr.interval - d): + case <-ctx.Done(): + return ctx.Err() + } + } + return mgr.queue.Push([]byte(item.ID)) +} + +func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Request) error { + if len(info.Attempts) >= mgr.maxAttempts { + mgr.requestDead(ctx, id, info) + log.Println("maximum attempts reached for request", id) + return nil + } + data, err := json.Marshal(requeueItem{ + At: time.Now(), + ID: id, + }) + if err != nil { + return err + } + return mgr.requeue.Push(data) +} + +func (mgr *Worker) saveRequest(req *http.Request) (string, error) { + id := uuid.New().String() + err := mgr.blob.Push(id, func(out io.Writer) error { + _, err := io.Copy(out, req.Body) + return err + }) + if err != nil { + return "", err + } + return id, mgr.meta.CreateRequest(id, req.Header, req.RequestURI, req.Method) +} + +func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Request) { + err := mgr.meta.Complete(id) + if err != nil { + log.Println("failed complete (dead) request:", err) + } + if handler := mgr.onDead; handler != nil { + handler(ctx, id, info) + } + log.Println("request", id, "completely failed") +} + +func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) { + err := mgr.meta.Complete(id) + if err != nil { + log.Println("failed complete (success) request:", err) + } + if handler := mgr.onSuccess; handler != nil { + handler(ctx, id, info) + } + log.Println("request", id, "complete successfully") +} + +func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID string, info *meta.Request) { + if handler := mgr.onProcess; handler != nil { + handler(ctx, id, attemptID, info) + } + log.Println("request", id, "processed with attempt", attemptID) +} + +type requeueItem struct { + At time.Time + ID string +}