From 4edfaa4d26b631f7f42da75dc8bfd80aeb429d9d Mon Sep 17 00:00:00 2001 From: Alexander Baryshnikov Date: Thu, 10 Sep 2020 18:11:34 +0800 Subject: [PATCH] initial code added --- .dockerignore | 7 + .gitignore | 4 + .golangci.yml | 41 ++ .goreleaser.yml | 71 +++ Dockerfile | 7 + Dockerfile.build | 13 + README.md | 19 + _docs/api.md | 114 +++++ _docs/authorization.md | 116 +++++ _docs/docker.md | 38 ++ _docs/flow.md | 14 + _docs/flow.txt | 19 + _docs/goals.md | 12 + _docs/internals/worker_flow | 1 + _docs/unit.md | 3 + bundle/debian/nano-run.service | 12 + bundle/debian/postinstall.sh | 12 + bundle/debian/preremove.sh | 14 + bundle/debian/server.yaml | 6 + bundle/docker/server.yaml | 5 + cmd/nano-run/main.go | 50 +++ cmd/nano-run/run_cmd.go | 65 +++ cmd/nano-run/server_cmd.go | 71 +++ cmd/nano-run/signals_default.go | 9 + cmd/nano-run/signals_posix.go | 10 + go.mod | 13 + go.sum | 86 ++++ internal/nano_logger.go | 31 ++ server/auth.go | 129 ++++++ server/internal/adapter.go | 192 ++++++++ server/internal/flags.go | 9 + server/internal/flags_linux.go | 14 + server/mode_bin.go | 96 ++++ server/server.go | 144 ++++++ server/unit.go | 265 +++++++++++ services/blob/fsblob/impl.go | 80 ++++ services/blob/interface.go | 11 + services/meta/interface.go | 35 ++ .../meta/micrometa/request_meta_storage.go | 148 +++++++ services/queue/interface.go | 8 + services/queue/microqueue/micro_queue.go | 123 ++++++ worker/request_io.go | 37 ++ worker/worker.go | 415 ++++++++++++++++++ 43 files changed, 2569 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitignore create mode 100644 .golangci.yml create mode 100644 .goreleaser.yml create mode 100644 Dockerfile create mode 100644 Dockerfile.build create mode 100644 README.md create mode 100644 _docs/api.md create mode 100644 _docs/authorization.md create mode 100644 _docs/docker.md create mode 100644 _docs/flow.md create mode 100644 _docs/flow.txt create mode 100644 _docs/goals.md create mode 100644 _docs/internals/worker_flow create mode 100644 _docs/unit.md create mode 100644 bundle/debian/nano-run.service create mode 100755 bundle/debian/postinstall.sh create mode 100755 bundle/debian/preremove.sh create mode 100755 bundle/debian/server.yaml create mode 100644 bundle/docker/server.yaml create mode 100644 cmd/nano-run/main.go create mode 100644 cmd/nano-run/run_cmd.go create mode 100644 cmd/nano-run/server_cmd.go create mode 100644 cmd/nano-run/signals_default.go create mode 100644 cmd/nano-run/signals_posix.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/nano_logger.go create mode 100644 server/auth.go create mode 100644 server/internal/adapter.go create mode 100644 server/internal/flags.go create mode 100644 server/internal/flags_linux.go create mode 100644 server/mode_bin.go create mode 100644 server/server.go create mode 100644 server/unit.go create mode 100644 services/blob/fsblob/impl.go create mode 100644 services/blob/interface.go create mode 100644 services/meta/interface.go create mode 100644 services/meta/micrometa/request_meta_storage.go create mode 100644 services/queue/interface.go create mode 100644 services/queue/microqueue/micro_queue.go create mode 100644 worker/request_io.go create mode 100644 worker/worker.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..47f3bdf --- /dev/null +++ b/.dockerignore @@ -0,0 +1,7 @@ +Dockerfile +/.idea +/_docs +/build +/dist +/run +/conf.d \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e6e702d --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.idea +/run +/dist +/build \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..0c23a10 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,41 @@ +run: + tests: false +linters: + disable-all: false + enable: + - bodyclose + - deadcode + - depguard + - dogsled + - dupl + - errcheck + - exhaustive + - funlen + - gochecknoinits + - goconst + - gocyclo + - gofmt + - goimports + - golint + - gomnd + - goprintffuncname + - gosec + - gosimple + - govet + - ineffassign + - interfacer + - misspell + - nakedret + - noctx + - nolintlint + - rowserrcheck + - scopelint + - staticcheck + - structcheck + - stylecheck + - typecheck + - unconvert + - unparam + - unused + - varcheck + - whitespace \ No newline at end of file diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..a35efaa --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,71 @@ +project_name: nano-run +builds: + - env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + goarch: + - amd64 + - arm64 + - arm + flags: + - -trimpath + main: ./cmd/nano-run + +nfpms: + - id: debian + file_name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}{{ if .Arm }}v{{ .Arm }}{{ end }}" + replacements: + Linux: linux + 386: i386 + homepage: https://github.com/reddec/nano-run + maintainer: Baryshnikov Aleksandr + description: Lightweigt runner for web requests + license: Apache-2.0 + formats: + - deb + scripts: + postinstall: "bundle/debian/postinstall.sh" + preremove: "bundle/debian/preremove.sh" + empty_folders: + - /etc/nano-run/conf.d + config_files: + "bundle/debian/server.yaml": "/etc/nano-run/server.yaml" +uploads: + - name: bintray + method: PUT + mode: archive + username: reddec + custom_artifact_name: true + ids: + - debian + target: 'https://api.bintray.com/content/reddec/debian/{{ .ProjectName }}/{{ .Version }}/{{ .ArtifactName }};publish=1;deb_component=main;deb_distribution=all;deb_architecture={{ .Arch }}' +dockers: + - binaries: + - nano-run + dockerfile: Dockerfile + extra_files: + - bundle/docker/server.yaml + build_flag_templates: + - "--label=org.opencontainers.image.created={{.Date}}" + - "--label=org.opencontainers.image.title={{.ProjectName}}" + - "--label=org.opencontainers.image.revision={{.FullCommit}}" + - "--label=org.opencontainers.image.version={{.Version}}" + image_templates: + - "reddec/nano-run:{{ .Tag }}" + - "reddec/nano-run:v{{ .Major }}" + - "reddec/nano-run:v{{ .Major }}.{{ .Minor }}" + - "reddec/nano-run:latest" +checksum: + name_template: 'checksums.txt' +snapshot: + name_template: "{{ .Tag }}-next" +changelog: + sort: asc + filters: + exclude: + - '^docs:' + - '^test:' + - '^build:' diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d4c0e1a --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:3.12 +VOLUME /data +VOLUME /conf.d +EXPOSE 80 +COPY nano-run /bin/nano-run +COPY bundle/docker/server.yaml /server.yaml +CMD ["/bin/nano-run", "server", "run", "-f", "-c", "server.yaml"] \ No newline at end of file diff --git a/Dockerfile.build b/Dockerfile.build new file mode 100644 index 0000000..16d4ed4 --- /dev/null +++ b/Dockerfile.build @@ -0,0 +1,13 @@ +FROM golang:1.15-alpine3.12 AS build +WORKDIR /go/src/app +COPY . . + +RUN go get -d -v ./... +RUN go build -o nano-run -v ./cmd/nano-run/... + +FROM alpine:3.12 +VOLUME /data +VOLUME /conf.d +COPY docker/server.yaml /server.yaml +COPY --from=build /go/src/app/nano-run /bin/nano-run +CMD ["/bin/nano-run", "server", "run", "-f", "-c", "server.yaml"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d8d641c --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# Nano-Run + +Lightweight async request runner. + +A simplified version of [trusted-cgi](https://github.com/reddec/trusted-cgi) designed +for async processing extreme amount of requests. + +## Goals + +* Minimal requirements for host; +* Should have semi-constant resource consumption regardless of: + * number of requests, + * size of requests, + * kind of requests; +* Should be ready to run without configuration; +* Should be ready for deploying in clouds; +* Should support extending for another providers; +* Can be used as library and as a complete solution; +* **Performance (throughput/latency) has less priority** than resource usage. \ No newline at end of file diff --git a/_docs/api.md b/_docs/api.md new file mode 100644 index 0000000..b7ba39d --- /dev/null +++ b/_docs/api.md @@ -0,0 +1,114 @@ +# API + +Base url: `/{application}` + + +## Start process + +||| +|------------|------| +| **Method** | POST | +| **Path** | / | + +Saves and enqueue request. Return 303 See Other on success with headers. + +* `X-Correlation-Id` - with ID of request +* `Location` - URL to status + +Clients can use cUrl flag `-L` to automatically follow redirect + + curl -L -d '' 'http://127.0.0.1:8989/app/' + +## Get status + + +||| +|------------|------------------| +| **Method** | GET | +| **Path** | /{correlationId} | + +Returns JSON with full meta-data of request and following headers: + +* `Content-Version` - number of attempts +* `Last-Modified` - latest of time of creation, time of last attempt or completion time +* `Location` - URL to the complete attempt +* `X-Status` - status of request processing: `complete` or `processing` +* `X-Last-Attempt` - id of last attempt +* `X-Last-Attempt-At` - time of last attempt +* `X-Correlation-Id` - with ID of request + +Body example: + +```json +{ + "created_at": "2020-09-10T17:11:33.598542177+08:00", + "complete_at": "2020-09-10T17:11:33.616550544+08:00", + "attempts": [ + { + "code": 200, + "headers": {}, + "id": "51748767-e89b-48a1-8b00-b9c1f0fdc9bb", + "created_at": "2020-09-10T17:11:33.614030236+08:00" + } + ], + "headers": { + "Accept": [ + "*/*" + ], + "Content-Length": [ + "0" + ], + "Content-Type": [ + "application/x-www-form-urlencoded" + ], + "User-Agent": [ + "curl/7.68.0" + ] + }, + "uri": "/date/", + "method": "POST", + "complete": true +} +``` + +## Get complete + +||| +|------------|---------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/complete | + +Will redirect to the complete attempt or 404 + + + +## Get attempt + +||| +|------------|--------------------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/attempt/{attemptId} | + +Get result of processing request for the defined attempt + +Returns body, code and headers same as processor returned with additional headers: + +* `X-Status` - status of request processing: `complete` or `processing` +* `X-Processed` - `true` to distinguish result + + + +## Get request + +||| +|------------|--------------------------| +| **Method** | GET | +| **Path** | /{correlationId}/request | + +Get request same as it was POSTed for the defined ID + +Returns body and headers same as processor got from client with additional headers: + +* `X-Method` - request method (currently always `POST`) +* `X-Request-Uri` - request URI +* `Last-Modified` - time of creation diff --git a/_docs/authorization.md b/_docs/authorization.md new file mode 100644 index 0000000..70a1476 --- /dev/null +++ b/_docs/authorization.md @@ -0,0 +1,116 @@ +# Authorization + +By-default - authorization disabled. Multiple policies allowed. +To allow request at least one policy should be passed. +Each authorization policy can enabled by `enable: yes` param. + +Section in `server.yaml`: `authorization` + +## JWT + +*section: `authorization.jwt`* + +[Overview](https://jwt.io/) + +HMAC 256 signature validation against secret key + +Configurable parameters: + +* `header` (optional, string, default: `Authorization`) - header that contains JWT +* `secret` (required, string) - secret key to validate signature + +Example minimal unit config + +```yaml +command: 'echo hello world' +authorization: + jwt: + enable: yes + secret: '$eCrEtKey' +``` + +## Query token + +*section: `authorization.query_token`* + +Plain token in a query string. Will be matched against list of allowed tokens. + +For example, client can invoke endpoint by addition token query: `http://example.com/app/?token=deadbeaf` + +Configurable parameters: + +* `param` (optional, string, default: `token`) - query param where token should be placed +* `tokens` (required, []string) - list of allowed tokens + +Example minimal unit config with 3 tokens + +```yaml +command: 'echo hello world' +authorization: + query_token: + enable: yes + tokens: + - my-token-1 + - his-token-2 + - deadbeaf +``` + +## Header token + +*section: `authorization.header_token`* + +Plain token in a header. Will be matched against list of allowed tokens. + +For example, client can invoke endpoint by curl: + + curl -H 'X-Api-Token: deadbeaf' http://example.com/app/ + +Configurable parameters: + +* `header` (optional, string, default: `X-Api-Token`) - header name where token should be placed +* `tokens` (required, []string) - list of allowed tokens + +Example minimal unit config with 3 tokens + +```yaml +command: 'echo hello world' +authorization: + header_token: + enable: yes + tokens: + - my-token-1 + - his-token-2 + - deadbeaf +``` + +## Basic + +*section: `authorization.basic`* + +Basic authentication. [Overview](https://en.wikipedia.org/wiki/Basic_access_authentication) + +For example, client can invoke endpoint by curl: + + curl -u 'alice:admin' http://example.com/app/ + +To [calculate](https://unix.stackexchange.com/a/419855) hash you may use `htpasswd` (Debian/Ubuntu: `sudo apt install apache2-utils`) + + htpasswd -bnBC 10 "" password | tr -d ':' + +where `passsword` is a desired password for the user. + +Configurable parameters: + +* `users` (string->string, required) - map of users and their hashed password by bcrypt + +Example minimal config: + +```yaml +command: 'echo hello world' +authorization: + basic: + enable: yes + users: + alice: '$2y$10$cUe3n8NHaxee.AaGzT8wF.nirPnjv5YLEQGTsLiiMiUAknM2aF2FS' + bob: '$2y$10$iSczi.MlKTrMv3h0Zf.GDeW1NS6ZWxBgtj4ytrKKDrR2s2wIxq5Qa' +``` diff --git a/_docs/docker.md b/_docs/docker.md new file mode 100644 index 0000000..df3f985 --- /dev/null +++ b/_docs/docker.md @@ -0,0 +1,38 @@ +# Docker + + +Check images in [releases](https://github.com/reddec/nano-run/releases) + +* Latest one: `reddec/nano-run:latest` + + +Create Dockerfile inherited from the image and copy configuration and binaries + +## Minimal example + +**app.yaml** +```yaml +command: '/mybinary --with --some args' +``` + +**Dockerfile** +```dockerfile +FROM reddec/nano-run +COPY app.yaml /conf.d/app.yaml +COPY mybinary /mybinary +``` + +**Build & Run** + +```bash +docker run --rm -p 127.0.0.1:8080:80 $(docker build -q .) +``` + +Check it's working by sending test request + +``` +curl -v -X POST "http://127.0.0.1:8080/app/" +``` + +* To keep tasks persistent - mount `/data` volume like: +`docker run -v $(pwd)/data:data ...` \ No newline at end of file diff --git a/_docs/flow.md b/_docs/flow.md new file mode 100644 index 0000000..902cac2 --- /dev/null +++ b/_docs/flow.md @@ -0,0 +1,14 @@ +# High-level overview + +Subjects: + +* Client - the side which is making HTTP(S) request to the System +* System - instance of nano-run that routing request +* Worker - executable that implements business logic + +During restart - all incomplete tasks will queued again. + +![image](https://user-images.githubusercontent.com/6597086/92712138-d8b58580-f38b-11ea-8a26-251df5c4ae13.png) + +![image](https://user-images.githubusercontent.com/6597086/92578247-3085bb00-f2be-11ea-87de-e2c9d94a21fa.png) + diff --git a/_docs/flow.txt b/_docs/flow.txt new file mode 100644 index 0000000..dbb19c1 --- /dev/null +++ b/_docs/flow.txt @@ -0,0 +1,19 @@ +title Request processing + +Client->System: HTTP(s) request +System->System: Save request +System->Client: Correlation ID +loop attempts +System->Worker: Request +alt request failed +Worker->System: failed +System->System: requeue +else success +Worker->System: success +end +System->System: save attemp +end + +System->System: mark request as complete +Client->System: get result +System->Client: info \ No newline at end of file diff --git a/_docs/goals.md b/_docs/goals.md new file mode 100644 index 0000000..2aa9b08 --- /dev/null +++ b/_docs/goals.md @@ -0,0 +1,12 @@ +# Goals + +* Minimal requirements for host; +* Should have semi-constant resource consumption regardless of: + * number of requests, + * size of requests, + * kind of requests; +* Should be ready to run without configuration; +* Should be ready for deploying in clouds; +* Should support extending for another providers; +* Can be used as library and as a complete solution; +* Performance has less priority than resource usage. \ No newline at end of file diff --git a/_docs/internals/worker_flow b/_docs/internals/worker_flow new file mode 100644 index 0000000..10c0a2a --- /dev/null +++ b/_docs/internals/worker_flow @@ -0,0 +1 @@ +7Vzdc5s4EP9rPNN76A3f4MdzmjQP6U2veWjvUQbZpgZEhRzb99efBMJ8CNtyEiNIM9OZWgJ9sLu/3dXuKhPzJt59xiBdfUEBjCaGFuwm5qeJYeiWYUzYPy3YFz2uNS06ljgM+EtVx2P4H+SdGu/dhAHMGi8ShCISps1OHyUJ9EmjD2CMts3XFihqrpqCJRQ6Hn0Qib3fw4Csil7PcKv+exguV+XKusO/Lwbly/xLshUI0LbWZd5OzBuMECl+xbsbGDHilXQpxt0deXrYGIYJkRnw/Uv02VqvH2ffluv7qf3wz0OQfHStYponEG34F2P48dcG0halVkQ3XGye7EuKZNswjkBCW7OMYLSGNyhCOH9mBs7csR32hL90F0ZR+TxB+aBFrYsOWSwWhu/T/gBkK8j2qvPGV0AIxEneY2iUXjO+V4gJ3B2lgn6gLRVKiGJI8J6+wgfYFmcHl0ezbG8r7poly1Y1ztqlRAIuUcvD3IflvlEJBMkygtV6ltlcz9HF9aYdyxlmczUQMVoAAmdokwRZndP0R+1Lq66c/xfIgmN2yEIpChkBlOqBIA2YbefAtu0qJPAxBT57uqUagfatSBzxx1L8OyGnR7naInIHT/UunjonWNqg7sWktAVSpigfRkC2ZsjCKKb/ldT9UILujxME1oZDYF09hQ2BUjCgmps3ESYrtEQJiG6r3lmTltU7DwilnII/ISF7bobAhqAmfeEuJD/Y8D9t3vq39uTTjs+cN/a8UeyTbe45HKFfiDbYhxK4pQhdwlMzFlIp8hjDCJDwqbm/1+eYK2Ai2/g+zOhCd6LYr1A832QKRd5uivzBjtdE3uuQeO9qEm8JRPptJd6WlXhXqcR7Jw0qStPBGlTdcwZmUR2BlglSCQi9BocKHN2AoKzA+xqKWPMAI9aohuWt/oDkygLJUwqkqcD8LQhJfp7D+S5+bULMwKSRMIbj8KG6Th79YsrVBLLuYTYWK3MMVNp4QDVVCSrXVMrpZ6nPBp8rtqvitKvLctpUymldwDlMSj+EMogFq8KkOo6OQXse4irqtOdoTqDD1ZRTSfy4R4J6PeGnw/1AeA2Z61GESLUPOX4oZUAU5RHwvO2jxN9gSi1/z12TTAzw1OKpEiHQVvQ0sKEXWF2hWM+Ym47zOkHTFvY6TsJWVwxzal0Le7p4GjhwRHWk8ohRL6eZWk0vULUa08WwzKlQpSLbcBlRdfVUnY7FODxbyZdfeE7Jl1g9q+R1pcEaQzwMUZ0KQczPlzAjFRgygjBLHI4DDqZyOBhiailMnqjBYhsASRAxzT0KWlq6rZqWo4kD9+53yqqkkl9nVZKh9NxmiKa5RI0T0W3P5hQ1zpL9UpxJOY0hQ29iqCsH3msqxRrQ0U2ThFBPoQ9ZCJmyoQ+1Vt0UQx8xwOu6TXdAzICQzLM0J1wbWj6K0wiSoRr7dn1Jx+GsX/tkibZedVT5gljj9W2PbHTYVBrzMA2Bixl4Yv4aC0nEKRkoHNr+mt1ha3r217zhGJuR+mulMJ7319QaG7UZlbfAaUO2lqkQiRdwmg/9isKE1FxVu3Xcc+3mFMUX8FGVvJydyJ629EvxhcJEr1XMabljsbjHXFxDvY8rW2b0Yh83H/oXxmBfeyFlspEdl1W96fl5U+2kRLZe1y27JXfF+q8qhaZYB1vLPBbxZZaAbPvctRr0McWdPUe1s2GJBH93vmvyKBuLVut8W0MrdnsTvoU885XWSZsi82OwmzA10laT/CiWdSlQ4Oep5DHEA92OSrhe44GmGGj9ezR4uzpu5CMWjkrc2GICT7XpGxIXLU+Si7XLiSpMn1gzP6rchqE8t/FeedAubTkr8pbS6nZbzFHUipnU3hI5k3to3ZFSXmdgi7mH91si/ZkOW+mpyRPLNO+P1Ja8qOZy4fkwv7Eu1FzOPduyNVnEXXZRvSOv13lP3dSuBS5PdLCqwiiAMWWwMkXlnbHS7rD0lCeqfJ5dqyg6R8Geh8e0eYTmE+VlZ5cRWVeeiS7vML5dV8iTVeElds+qcE+tCjeO4YLOD9joZIFKUPCuUYGi63jQMyjsd1C0tPB5UBhKQSE6tVUSpbIXdI4B3eY6AwvdGZyxUFpWe8k54fq4MGRxobQ81hOjtj7CxdooKRGBIdngZLCeaRsI6l3TAZUrydbG9nNelsePbBGLd6VyJdqs/q5ckcuv/jqfefs/ \ No newline at end of file diff --git a/_docs/unit.md b/_docs/unit.md new file mode 100644 index 0000000..da58894 --- /dev/null +++ b/_docs/unit.md @@ -0,0 +1,3 @@ +# Unit + +* If work dir is not defined - temporary directory will be created and removed after execution for each request automatically. \ No newline at end of file diff --git a/bundle/debian/nano-run.service b/bundle/debian/nano-run.service new file mode 100644 index 0000000..c94df6d --- /dev/null +++ b/bundle/debian/nano-run.service @@ -0,0 +1,12 @@ +[Unit] +Description=Lightweight async request processor + +[Service] +ExecStart=/usr/bin/nano-run server run -c /etc/nano-run/server.yaml +Restart=always +RestartSec=3 +User=nano-run +Group=nano-run + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/bundle/debian/postinstall.sh b/bundle/debian/postinstall.sh new file mode 100755 index 0000000..ecca49f --- /dev/null +++ b/bundle/debian/postinstall.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +SERVICE="nano-run" +RUNNING_USER="${SERVICE}" + +if ! id -u ${RUNNING_USER}; then + echo "Creating user ${RUNNING_USER}..." + useradd -M -c "${RUNNING_USER} dummy user" -r -s /bin/nologin ${RUNNING_USER} +fi + +systemctl enable "${SERVICE}".service || echo "failed to enable service" +systemctl start "${SERVICE}".service || echo "failed to start service" diff --git a/bundle/debian/preremove.sh b/bundle/debian/preremove.sh new file mode 100755 index 0000000..7987f5f --- /dev/null +++ b/bundle/debian/preremove.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +SERVICE="nano-run" +RUNNING_USER="${SERVICE}" + +systemctl stop "${SERVICE}".service || echo "failed to stop service" +systemctl disable "${SERVICE}".service || echo "failed to disable service" + +if id -u ${RUNNING_USER}; then + echo "Removing user ${RUNNING_USER}..." + userdel -r ${RUNNING_USER} +fi + + diff --git a/bundle/debian/server.yaml b/bundle/debian/server.yaml new file mode 100755 index 0000000..72c01a4 --- /dev/null +++ b/bundle/debian/server.yaml @@ -0,0 +1,6 @@ +# Location to store tasks, blobs and queues +working_directory: /var/nano-run +config_directory: /etc/nano-run/conf.d +bind: 127.0.0.1:8989 +graceful_shutdown: 5s +max_request: 1048576 diff --git a/bundle/docker/server.yaml b/bundle/docker/server.yaml new file mode 100644 index 0000000..430daef --- /dev/null +++ b/bundle/docker/server.yaml @@ -0,0 +1,5 @@ +working_directory: /data +config_directory: /conf.d +bind: ":80" +graceful_shutdown: 5s +max_request: 1048576 diff --git a/cmd/nano-run/main.go b/cmd/nano-run/main.go new file mode 100644 index 0000000..c209b48 --- /dev/null +++ b/cmd/nano-run/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "os" + "os/signal" + + "github.com/jessevdk/go-flags" +) + +var ( + version = "dev" + commit = "dev" +) + +type Config struct { + Run runCmd `command:"run" description:"run single unit"` + Server serverCmd `command:"server" description:"manage server"` +} + +func main() { + if len(os.Args) == 1 { + os.Args = append(os.Args, "server", "run") + } + var cfg Config + parser := flags.NewParser(&cfg, flags.Default) + parser.LongDescription = "Async webhook processor with minimal system requirements.\n\n" + + "Author: Baryshnikov Aleksandr \n" + + "Source code: https://github.com/reddec/nano-run\n" + + "License: Apache 2.0\n" + + "Version: " + version + "\n" + + "Revision: " + commit + _, err := parser.Parse() + if err != nil { + os.Exit(1) + } +} + +func SignalContext() context.Context { + gctx, closer := context.WithCancel(context.Background()) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, signals...) + for range c { + closer() + break + } + }() + return gctx +} diff --git a/cmd/nano-run/run_cmd.go b/cmd/nano-run/run_cmd.go new file mode 100644 index 0000000..c542439 --- /dev/null +++ b/cmd/nano-run/run_cmd.go @@ -0,0 +1,65 @@ +package main + +import ( + "io/ioutil" + "os" + "path/filepath" + runtime "runtime" + "strconv" + "strings" + "time" + + "nano-run/server" +) + +type runCmd struct { + Directory string `long:"directory" short:"d" env:"DIRECTORY" description:"Data directory" default:"run"` + Interval time.Duration `long:"interval" short:"i" env:"INTERVAL" description:"Requeue interval" default:"3s"` + Attempts int `long:"attempts" short:"a" env:"ATTEMPTS" description:"Max number of attempts" default:"5"` + Concurrency int `long:"concurrency" short:"c" env:"CONCURRENCY" description:"Number of parallel worker (0 - mean number of CPU)" default:"0"` + Mode string `long:"mode" short:"m" env:"MODE" description:"Running mode" default:"bin" choice:"bin" choice:"cgi" choice:"proxy"` + Bind string `long:"bind" short:"b" env:"BIND" description:"Binding address" default:"127.0.0.1:8989"` + Args struct { + Executable string `arg:"executable" description:"path to binary to invoke or url" required:"yes"` + Args []string `arg:"args" description:"executable args"` + } `positional-args:"yes"` +} + +func (cfg *runCmd) Execute([]string) error { + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + srv := server.DefaultConfig() + srv.Bind = cfg.Bind + srv.WorkingDirectory = cfg.Directory + srv.ConfigDirectory = tmpDir + + unit := server.DefaultUnit() + + var params []string + params = append(params, strconv.Quote(cfg.Args.Executable)) + for _, arg := range cfg.Args.Args { + params = append(params, strconv.Quote(arg)) + } + unit.Command = strings.Join(params, " ") + unit.WorkDir, _ = os.Getwd() + unit.Attempts = cfg.Attempts + unit.Interval = cfg.Interval + unit.Workers = cfg.concurrency() + unit.Mode = cfg.Mode + + err = unit.SaveFile(filepath.Join(tmpDir, "main.yaml")) + if err != nil { + return err + } + return srv.Run(SignalContext()) +} + +func (cfg runCmd) concurrency() int { + if cfg.Concurrency <= 0 { + return runtime.NumCPU() + } + return cfg.Concurrency +} diff --git a/cmd/nano-run/server_cmd.go b/cmd/nano-run/server_cmd.go new file mode 100644 index 0000000..3eb8c6b --- /dev/null +++ b/cmd/nano-run/server_cmd.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "os" + "path/filepath" + + "nano-run/server" +) + +type serverCmd struct { + Run serverRunCmd `command:"run" description:"run server"` + Init serverInitCmd `command:"init" description:"initialize server"` +} + +type serverInitCmd struct { + Directory string `short:"d" long:"directory" env:"DIRECTORY" description:"Target directory" default:"server"` + ConfigFile string `long:"config-file" env:"CONFIG_FILE" description:"Config file name" default:"server.yaml"` + NoSample bool `long:"no-sample" env:"NO_SAMPLE" description:"Do not create same file"` +} + +func (cmd *serverInitCmd) Execute([]string) error { + err := os.MkdirAll(cmd.Directory, 0755) + if err != nil { + return err + } + cfg := server.DefaultConfig() + err = cfg.SaveFile(filepath.Join(cmd.Directory, cmd.ConfigFile)) + if err != nil { + return err + } + err = os.MkdirAll(filepath.Join(cmd.Directory, cfg.ConfigDirectory), 0755) + if err != nil { + return err + } + err = os.MkdirAll(filepath.Join(cmd.Directory, cfg.WorkingDirectory), 0755) + if err != nil { + return err + } + if !cmd.NoSample { + unit := server.DefaultUnit() + err = unit.SaveFile(filepath.Join(cmd.Directory, cfg.ConfigDirectory, "sample.yaml")) + if err != nil { + return err + } + } + return nil +} + +type serverRunCmd struct { + Fail bool `short:"f" long:"fail" env:"FAIL" description:"Fail if no config file"` + Config string `short:"c" long:"config" env:"CONFIG" description:"Configuration file" default:"server.yaml"` +} + +func (cmd *serverRunCmd) Execute([]string) error { + cfg := server.DefaultConfig() + err := cfg.LoadFile(cmd.Config) + if os.IsNotExist(err) && !cmd.Fail { + log.Println("no config file found - using transient default configuration") + cfg.ConfigDirectory = filepath.Join("run", "conf.d") + cfg.WorkingDirectory = filepath.Join("run", "data") + err := cfg.CreateDirs() + if err != nil { + return err + } + } else if err != nil { + return err + } + log.Println("configuration loaded") + return cfg.Run(SignalContext()) +} diff --git a/cmd/nano-run/signals_default.go b/cmd/nano-run/signals_default.go new file mode 100644 index 0000000..7b68308 --- /dev/null +++ b/cmd/nano-run/signals_default.go @@ -0,0 +1,9 @@ +// +build !darwin,!linux + +package main + +import ( + "os" +) + +var signals = []os.Signal{os.Interrupt} diff --git a/cmd/nano-run/signals_posix.go b/cmd/nano-run/signals_posix.go new file mode 100644 index 0000000..058898c --- /dev/null +++ b/cmd/nano-run/signals_posix.go @@ -0,0 +1,10 @@ +// +build linux darwin + +package main + +import ( + "os" + "syscall" +) + +var signals = []os.Signal{syscall.SIGTERM, os.Interrupt} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4363e62 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module nano-run + +go 1.14 + +require ( + github.com/dgraph-io/badger v1.6.1 + github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/google/uuid v1.1.2 + github.com/gorilla/mux v1.8.0 + github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7 + golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a + gopkg.in/yaml.v2 v2.3.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d356f2c --- /dev/null +++ b/go.sum @@ -0,0 +1,86 @@ +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.6.1 h1:w9pSFNSdq/JPM1N12Fz/F/bzo993Is1W+Q7HjPzi7yg= +github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU= +github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= +github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7 h1:Ug59miTxVKVg5Oi2S5uHlKOIV5jBx4Hb2u0jIxxDaSs= +github.com/jessevdk/go-flags v1.4.1-0.20200711081900-c17162fe8fd7/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a h1:vclmkQCjlDX5OydZ9wv8rBCcS0QyQY66Mpf/7BZbInM= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb h1:fgwFCsaw9buMuxNd6+DQfAuSFqbNiQZpcgJQAgJsK6k= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/nano_logger.go b/internal/nano_logger.go new file mode 100644 index 0000000..0fe7df9 --- /dev/null +++ b/internal/nano_logger.go @@ -0,0 +1,31 @@ +package internal + +import ( + "log" + + "github.com/dgraph-io/badger" +) + +func NanoLogger(wrap *log.Logger) badger.Logger { + return &nanoLogger{logger: wrap} +} + +type nanoLogger struct { + logger *log.Logger +} + +func (nl *nanoLogger) Errorf(s string, i ...interface{}) { + nl.logger.Printf("[error] "+s, i...) +} + +func (nl *nanoLogger) Warningf(s string, i ...interface{}) { + nl.logger.Printf("[warn] "+s, i...) +} + +func (nl *nanoLogger) Infof(s string, i ...interface{}) { + nl.logger.Printf("[info] "+s, i...) +} + +func (nl *nanoLogger) Debugf(s string, i ...interface{}) { + nl.logger.Printf("[debug] "+s, i...) +} diff --git a/server/auth.go b/server/auth.go new file mode 100644 index 0000000..9ed6e9c --- /dev/null +++ b/server/auth.go @@ -0,0 +1,129 @@ +package server + +import ( + "errors" + "net/http" + + "github.com/dgrijalva/jwt-go" + "golang.org/x/crypto/bcrypt" +) + +func (cfg Unit) enableAuthorization() func(handler http.Handler) http.Handler { + var handlers []AuthHandlerFunc + if cfg.Authorization.JWT.Enable { + handlers = append(handlers, cfg.Authorization.JWT.Create()) + } + if cfg.Authorization.QueryToken.Enable { + handlers = append(handlers, cfg.Authorization.QueryToken.Create()) + } + if cfg.Authorization.HeaderToken.Enable { + handlers = append(handlers, cfg.Authorization.HeaderToken.Create()) + } + if cfg.Authorization.Basic.Enable { + handlers = append(handlers, cfg.Authorization.Basic.Create()) + } + if len(handlers) == 0 { + return func(handler http.Handler) http.Handler { + return handler + } + } + return func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + var authorized bool + for _, h := range handlers { + if h(request) { + authorized = true + break + } + } + if !authorized { + writer.WriteHeader(http.StatusForbidden) + return + } + handler.ServeHTTP(writer, request) + }) + } +} + +type AuthHandlerFunc func(req *http.Request) bool + +type JWT struct { + Header string `yaml:"header"` // JWT header - by default Authorization + Secret string `yaml:"secret"` // key to verify JWT +} + +func (cfg JWT) Create() AuthHandlerFunc { + header := cfg.Header + if header == "" { + header = "Authorization" + } + + return func(req *http.Request) bool { + rawToken := req.Header.Get(header) + t, err := jwt.Parse(rawToken, func(token *jwt.Token) (interface{}, error) { + if token.Method != jwt.SigningMethodHS256 { + return nil, errors.New("unknown method") + } + return []byte(cfg.Secret), nil + }) + return err == nil && t.Valid + } +} + +type QueryToken struct { + Param string `yaml:"param"` // query name - by default 'token' + Tokens []string `yaml:"tokens"` // allowed tokens +} + +func (cfg QueryToken) Create() AuthHandlerFunc { + param := cfg.Param + if param == "" { + param = "token" + } + tokens := map[string]bool{} + for _, k := range cfg.Tokens { + tokens[k] = true + } + return func(req *http.Request) bool { + token := req.URL.Query().Get(param) + return tokens[token] + } +} + +type HeaderToken struct { + Header string `yaml:"header"` // header name - by default X-Api-Token + Tokens []string `yaml:"tokens"` // allowed tokens +} + +func (cfg HeaderToken) Create() AuthHandlerFunc { + header := cfg.Header + if header == "" { + header = "X-Api-Token" + } + tokens := map[string]bool{} + for _, k := range cfg.Tokens { + tokens[k] = true + } + return func(req *http.Request) bool { + token := req.URL.Query().Get(header) + return tokens[token] + } +} + +type Basic struct { + Users map[string]string `yaml:"users"` // users -> bcrypted password map +} + +func (cfg Basic) Create() AuthHandlerFunc { + return func(req *http.Request) bool { + u, p, ok := req.BasicAuth() + if !ok { + return false + } + h, ok := cfg.Users[u] + if !ok { + return false + } + return bcrypt.CompareHashAndPassword([]byte(h), []byte(p)) == nil + } +} diff --git a/server/internal/adapter.go b/server/internal/adapter.go new file mode 100644 index 0000000..7d18312 --- /dev/null +++ b/server/internal/adapter.go @@ -0,0 +1,192 @@ +package internal + +import ( + "encoding/json" + "io" + "log" + "net/http" + "strconv" + "time" + + "github.com/gorilla/mux" + + "nano-run/services/meta" + "nano-run/worker" +) + +// Expose worker as HTTP handler: +// POST / - post task async, returns 303 See Other and location. +// GET /:id - get task info. +// GET /:id/completed - redirect to completed attempt (or 404 if task not yet complete) +// GET /:id/attempt/:atid - get attempt result (as-is). +// GET /:id/request - replay request (as-is). +func Expose(router *mux.Router, wrk *worker.Worker) { + //TODO: wait + router.Path("/").Methods("POST").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + id, err := wrk.Enqueue(request) + if err != nil { + log.Println("failed to enqueue:", err) + http.Error(writer, "failed to enqueue", http.StatusInternalServerError) + return + } + writer.Header().Set("X-Correlation-Id", id) + http.Redirect(writer, request, id, http.StatusSeeOther) + }) + // get state: 200 with json description. For complete request - Location header will be filled. + router.Path("/{id}").Methods("GET").HandlerFunc(createTask(wrk)) + router.Path("/{id}/completed").Methods("GET").HandlerFunc(getComplete(wrk)) + // get attempt result as-is. + router.Path("/{id}/attempt/{attemptId}").Methods("GET").HandlerFunc(getAttempt(wrk)) + // get recorded request. + router.Path("/{id}/request").Methods("GET").HandlerFunc(getRequest(wrk)) +} + +// get request meta information. +func createTask(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + data, err := json.Marshal(info) + if err != nil { + log.Println("failed to encode info:", err) + http.Error(writer, "encoding", http.StatusInternalServerError) + return + } + writer.Header().Set("Content-Type", "application/json") + writer.Header().Set("Content-Length", strconv.Itoa(len(data))) + writer.Header().Set("Content-Version", strconv.Itoa(len(info.Attempts))) + // modification time + setLastModify(writer, info) + + writer.Header().Set("Age", strconv.FormatInt(int64(time.Since(info.CreatedAt)/time.Second), 10)) + if info.Complete { + writer.Header().Set("X-Status", "complete") + } else { + writer.Header().Set("X-Status", "processing") + } + + if len(info.Attempts) > 0 { + writer.Header().Set("X-Last-Attempt", info.Attempts[len(info.Attempts)-1].ID) + writer.Header().Set("X-Last-Attempt-At", info.Attempts[len(info.Attempts)-1].CreatedAt.Format(time.RFC850)) + } + + if info.Complete { + lastAttempt := info.Attempts[len(info.Attempts)-1] + request.URL.Path += "/attempt/" + lastAttempt.ID + writer.Header().Set("Location", request.URL.String()) + } + writer.WriteHeader(http.StatusOK) + _, _ = writer.Write(data) + } +} + +func getComplete(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + if !info.Complete { + http.NotFound(writer, request) + return + } + lastAttempt := info.Attempts[len(info.Attempts)-1] + http.Redirect(writer, request, "attempt/"+lastAttempt.ID, http.StatusMovedPermanently) + } +} + +func getAttempt(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + attemptID := params["attemptId"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + var attempt meta.Attempt + var found bool + for _, atp := range info.Attempts { + if atp.ID == attemptID { + found = true + attempt = atp + break + } + } + if !found { + http.NotFound(writer, request) + return + } + body, err := wrk.Blobs().Get(attempt.ID) + if err != nil { + log.Println("failed to get body:", err) + http.Error(writer, "get body", http.StatusInternalServerError) + return + } + defer body.Close() + writer.Header().Set("Last-Modified", attempt.CreatedAt.Format(time.RFC850)) + if info.Complete { + writer.Header().Set("X-Status", "complete") + } else { + writer.Header().Set("X-Status", "processing") + } + writer.Header().Set("X-Processed", "true") + for k, v := range attempt.Headers { + writer.Header()[k] = v + } + writer.WriteHeader(attempt.Code) + _, _ = io.Copy(writer, body) + } +} + +func getRequest(wrk *worker.Worker) http.HandlerFunc { + return func(writer http.ResponseWriter, request *http.Request) { + params := mux.Vars(request) + requestID := params["id"] + info, err := wrk.Meta().Get(requestID) + if err != nil { + log.Println("failed access request", requestID, ":", err) + http.NotFound(writer, request) + return + } + writer.Header().Set("Last-Modified", info.CreatedAt.Format(time.RFC850)) + f, err := wrk.Blobs().Get(requestID) + if err != nil { + log.Println("failed to get data:", err) + http.Error(writer, "data", http.StatusInternalServerError) + return + } + defer f.Close() + + writer.Header().Set("X-Method", info.Method) + writer.Header().Set("X-Request-Uri", info.URI) + + for k, v := range info.Headers { + writer.Header()[k] = v + } + writer.WriteHeader(http.StatusOK) + _, _ = io.Copy(writer, f) + } +} + +func setLastModify(writer http.ResponseWriter, info *meta.Request) { + if info.Complete { + writer.Header().Set("Last-Modified", info.CompleteAt.Format(time.RFC850)) + } else if len(info.Attempts) > 0 { + writer.Header().Set("Last-Modified", info.Attempts[len(info.Attempts)-1].CreatedAt.Format(time.RFC850)) + } else { + writer.Header().Set("Last-Modified", info.CreatedAt.Format(time.RFC850)) + } +} diff --git a/server/internal/flags.go b/server/internal/flags.go new file mode 100644 index 0000000..7278639 --- /dev/null +++ b/server/internal/flags.go @@ -0,0 +1,9 @@ +// +build !linux + +package internal + +import "os/exec" + +func SetBinFlags(cmd *exec.Cmd) { + +} diff --git a/server/internal/flags_linux.go b/server/internal/flags_linux.go new file mode 100644 index 0000000..5587dab --- /dev/null +++ b/server/internal/flags_linux.go @@ -0,0 +1,14 @@ +package internal + +import ( + "os/exec" + "syscall" +) + +func SetBinFlags(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Pdeathsig = syscall.SIGTERM + cmd.SysProcAttr.Setpgid = true +} diff --git a/server/mode_bin.go b/server/mode_bin.go new file mode 100644 index 0000000..ce467c2 --- /dev/null +++ b/server/mode_bin.go @@ -0,0 +1,96 @@ +package server + +import ( + "context" + "io/ioutil" + "net/http" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "nano-run/server/internal" +) + +type markerResponse struct { + dataSent bool + res http.ResponseWriter +} + +func (m *markerResponse) Header() http.Header { + return m.res.Header() +} + +func (m *markerResponse) Write(bytes []byte) (int, error) { + m.dataSent = true + return m.res.Write(bytes) +} + +func (m *markerResponse) WriteHeader(statusCode int) { + m.dataSent = true + m.res.WriteHeader(statusCode) +} + +type binHandler struct { + command string + workDir string + shell string + environment []string + timeout time.Duration +} + +func (bh *binHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + marker := &markerResponse{res: writer} + + ctx := request.Context() + if bh.timeout > 0 { + c, cancel := context.WithTimeout(ctx, bh.timeout) + defer cancel() + ctx = c + } + + cmd := exec.CommandContext(ctx, bh.shell, "-c", bh.command) //nolint:gosec + + if bh.workDir == "" { + tmpDir, err := ioutil.TempDir("", "") + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + defer os.RemoveAll(tmpDir) + cmd.Dir = tmpDir + } else { + cmd.Dir = bh.workDir + } + + var env = bh.cloneEnv() + for k, v := range request.Header { + ke := strings.ToUpper(strings.Replace(k, "-", "_", -1)) + env = append(env, ke+"="+strings.Join(v, ",")) + } + + cmd.Stderr = os.Stderr + cmd.Stdin = request.Body + cmd.Stdout = marker + cmd.Env = env + internal.SetBinFlags(cmd) + err := cmd.Run() + + if marker.dataSent { + return + } + if err != nil { + writer.Header().Set("X-Return-Code", strconv.Itoa(cmd.ProcessState.ExitCode())) + writer.WriteHeader(http.StatusBadGateway) + } else { + writer.Header().Set("X-Return-Code", strconv.Itoa(cmd.ProcessState.ExitCode())) + writer.WriteHeader(http.StatusNoContent) + } +} + +func (bh *binHandler) cloneEnv() []string { + var cp = make([]string, len(bh.environment)) + copy(cp, bh.environment) + return cp +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..4e5d463 --- /dev/null +++ b/server/server.go @@ -0,0 +1,144 @@ +package server + +import ( + "context" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + "gopkg.in/yaml.v2" +) + +type Config struct { + WorkingDirectory string `yaml:"working_directory"` + ConfigDirectory string `yaml:"config_directory"` + Bind string `yaml:"bind"` + GracefulShutdown time.Duration `yaml:"graceful_shutdown"` + TLS struct { + Enable bool `yaml:"enable"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + } `yaml:"tls,omitempty"` +} + +const ( + defaultGracefulShutdown = 5 * time.Second + defaultBind = "127.0.0.1:8989" +) + +func DefaultConfig() Config { + var cfg Config + cfg.Bind = defaultBind + cfg.WorkingDirectory = filepath.Join("run") + cfg.ConfigDirectory = filepath.Join("conf.d") + cfg.GracefulShutdown = defaultGracefulShutdown + return cfg +} + +func (cfg Config) CreateDirs() error { + err := os.MkdirAll(cfg.WorkingDirectory, 0755) + if err != nil { + return err + } + return os.MkdirAll(cfg.ConfigDirectory, 0755) +} + +func (cfg *Config) LoadFile(file string) error { + data, err := ioutil.ReadFile(file) + if err != nil { + return err + } + err = yaml.Unmarshal(data, cfg) + if err != nil { + return err + } + if !filepath.IsAbs(cfg.WorkingDirectory) { + cfg.WorkingDirectory = filepath.Join(filepath.Dir(file), cfg.WorkingDirectory) + } + if !filepath.IsAbs(cfg.ConfigDirectory) { + cfg.ConfigDirectory = filepath.Join(filepath.Dir(file), cfg.ConfigDirectory) + } + return nil +} + +func (cfg Config) SaveFile(file string) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return ioutil.WriteFile(file, data, 0600) +} + +func (cfg Config) Run(global context.Context) error { + units, err := Units(cfg.ConfigDirectory) + if err != nil { + return err + } + workers, err := Workers(cfg.WorkingDirectory, units) + if err != nil { + return err + } + defer func() { + for _, wrk := range workers { + wrk.Close() + } + }() + handler := Handler(units, workers) + + ctx, cancel := context.WithCancel(global) + + server := http.Server{ + Addr: cfg.Bind, + Handler: handler, + } + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + <-ctx.Done() + t, c := context.WithTimeout(context.Background(), cfg.GracefulShutdown) + _ = server.Shutdown(t) + c() + }() + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + err := Run(ctx, workers) + if err != nil { + log.Println("workers stopped:", err) + } + }() + if cfg.TLS.Enable { + err = server.ListenAndServeTLS(cfg.TLS.Cert, cfg.TLS.Key) + } else { + err = server.ListenAndServe() + } + cancel() + wg.Wait() + return err +} + +func limitRequest(maxSize int64, handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + body := request.Body + defer body.Close() + if request.ContentLength > maxSize { + http.Error(writer, "too big request", http.StatusBadRequest) + return + } + + limiter := io.LimitReader(request.Body, maxSize) + request.Body = ioutil.NopCloser(limiter) + handler.ServeHTTP(writer, request) + }) +} diff --git a/server/unit.go b/server/unit.go new file mode 100644 index 0000000..ee035c2 --- /dev/null +++ b/server/unit.go @@ -0,0 +1,265 @@ +package server + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/cgi" //nolint:gosec + "net/http/httputil" + "net/url" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "time" + + "github.com/gorilla/mux" + "gopkg.in/yaml.v2" + + "nano-run/server/internal" + "nano-run/worker" +) + +type Unit struct { + Interval time.Duration `yaml:"interval,omitempty"` // interval between attempts + Attempts int `yaml:"attempts,omitempty"` // maximum number of attempts + Workers int `yaml:"workers,omitempty"` // concurrency level - number of parallel requests + Mode string `yaml:"mode,omitempty"` // execution mode: bin, cgi or proxy + WorkDir string `yaml:"workdir,omitempty"` // working directory for the worker. if empty - temporary one will generated automatically + Command string `yaml:"command"` // command in a shell to execute + Timeout time.Duration `yaml:"timeout,omitempty"` // maximum execution timeout (enabled only for bin mode and only if positive) + Shell string `yaml:"shell,omitempty"` // shell to execute command in bin mode (default - /bin/sh) + Environment map[string]string `yaml:"environment,omitempty"` // custom environment for executable (in addition to system) + MaxRequest int64 `yaml:"max_request,omitempty"` // optional maximum HTTP body size (enabled if positive) + Authorization struct { + JWT struct { + Enable bool `yaml:"enable"` // enable JWT verification + JWT `yaml:",inline"` + } `yaml:"jwt,omitempty"` // HMAC256 JWT verification with shared secret + + QueryToken struct { + Enable bool `yaml:"enable"` // enable query-based token access + QueryToken `yaml:",inline"` + } `yaml:"query_token,omitempty"` // plain API tokens in request query params + + HeaderToken struct { + Enable bool `yaml:"enable"` // enable header-based token access + HeaderToken `yaml:",inline"` + } `yaml:"header_token,omitempty"` // plain API tokens in request header + + Basic struct { + Enable bool `yaml:"enable"` // enable basic verification + Basic `yaml:",inline"` + } `yaml:"basic,omitempty"` // basic authorization + } `yaml:"authorization,omitempty"` + name string +} + +const ( + defaultRequestSize = 1 * 1024 * 1024 // 1MB + defaultAttempts = 3 + defaultInterval = 5 * time.Second + defaultWorkers = 1 + defaultShell = "/bin/sh" + defaultMode = "bin" + defaultCommand = "echo hello world" + defaultName = "main" +) + +func DefaultUnit() Unit { + return Unit{ + Interval: defaultInterval, + Attempts: defaultAttempts, + Workers: defaultWorkers, + MaxRequest: defaultRequestSize, + Shell: defaultShell, + Mode: defaultMode, + Command: defaultCommand, + name: defaultName, + } +} + +func (cfg Unit) Validate() error { + var checks []string + if cfg.Interval < 0 { + checks = append(checks, "negative interval") + } + if cfg.Attempts < 0 { + checks = append(checks, "negative attempts") + } + if cfg.Workers < 0 { + checks = append(checks, "negative workers amount") + } + if !(cfg.Mode == "bin" || cfg.Mode == "cgi" || cfg.Mode == "proxy") { + checks = append(checks, "unknown mode "+cfg.Mode) + } + if len(checks) == 0 { + return nil + } + return errors.New(strings.Join(checks, ", ")) +} + +func (cfg Unit) SaveFile(file string) error { + data, err := yaml.Marshal(cfg) + if err != nil { + return err + } + return ioutil.WriteFile(file, data, 0600) +} + +func Units(configsDir string) ([]Unit, error) { + var configs []Unit + err := filepath.Walk(configsDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + name := info.Name() + if !(strings.HasSuffix(name, ".yaml") || strings.HasSuffix(name, ".yml")) { + return nil + } + unitName := strings.ReplaceAll(strings.Trim(path[len(configsDir):strings.LastIndex(path, ".")], "/\\"), string(filepath.Separator), "-") + cfg := DefaultUnit() + cfg.name = unitName + data, err := ioutil.ReadFile(path) + if err != nil { + return err + } + err = yaml.Unmarshal(data, &cfg) + if err != nil { + return err + } + configs = append(configs, cfg) + return nil + }) + return configs, err +} + +func Workers(workdir string, configurations []Unit) ([]*worker.Worker, error) { + var ans []*worker.Worker + for _, cfg := range configurations { + log.Println("validating", cfg.name) + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("configuration invalid for %s: %w", cfg.name, err) + } + if cfg.Workers == 0 { + cfg.Workers = runtime.NumCPU() + } + wrk, err := cfg.worker(workdir) + if err != nil { + for _, w := range ans { + w.Close() + } + return nil, err + } + ans = append(ans, wrk) + } + return ans, nil +} + +func Handler(units []Unit, workers []*worker.Worker) http.Handler { + router := mux.NewRouter() + for i, unit := range units { + prefix := "/" + unit.name + "/" + subRouter := router.PathPrefix(prefix).Subrouter() + subRouter.Use(unit.enableAuthorization()) + internal.Expose(subRouter, workers[i]) + } + return router +} + +func Run(global context.Context, workers []*worker.Worker) error { + if len(workers) == 0 { + <-global.Done() + return global.Err() + } + + ctx, cancel := context.WithCancel(global) + defer cancel() + var wg sync.WaitGroup + + for _, wrk := range workers { + wg.Add(1) + go func(wrk *worker.Worker) { + err := wrk.Run(ctx) + if err != nil { + log.Println("failed:", err) + } + wg.Done() + }(wrk) + } + + wg.Wait() + return ctx.Err() +} + +func (cfg Unit) worker(root string) (*worker.Worker, error) { + handler, err := cfg.handler() + if err != nil { + return nil, err + } + workdir := filepath.Join(root, cfg.name) + wrk, err := worker.Default(workdir) + if err != nil { + return nil, err + } + wrk = wrk.Attempts(cfg.Attempts).Interval(cfg.Interval).Concurrency(cfg.Workers).Handler(handler) + return wrk, nil +} + +func (cfg Unit) handler() (http.Handler, error) { + handler, err := cfg.createRunner() + if err != nil { + return nil, err + } + if cfg.MaxRequest > 0 { + handler = limitRequest(cfg.MaxRequest, handler) + } + //TODO: add authorization + return handler, nil +} + +func (cfg Unit) createRunner() (http.Handler, error) { + switch cfg.Mode { + case "bin": + return &binHandler{ + command: cfg.Command, + workDir: cfg.WorkDir, + shell: cfg.Shell, + timeout: cfg.Timeout, + environment: append(os.Environ(), makeEnvList(cfg.Environment)...), + }, nil + case "cgi": + return &cgi.Handler{ + Path: cfg.Shell, + Dir: cfg.WorkDir, + Env: append(os.Environ(), makeEnvList(cfg.Environment)...), + Logger: log.New(os.Stderr, "[cgi] ", log.LstdFlags), + Args: []string{"-c", cfg.Command}, + Stderr: os.Stderr, + }, nil + case "proxy": + // proxy to static URL + u, err := url.Parse(cfg.Command) + if err != nil { + return nil, err + } + return httputil.NewSingleHostReverseProxy(u), nil + default: + return nil, fmt.Errorf("unknown mode %s", cfg.Mode) + } +} + +func makeEnvList(content map[string]string) []string { + var ans = make([]string, 0, len(content)) + for k, v := range content { + ans = append(ans, k+"="+v) + } + return ans +} diff --git a/services/blob/fsblob/impl.go b/services/blob/fsblob/impl.go new file mode 100644 index 0000000..ce89899 --- /dev/null +++ b/services/blob/fsblob/impl.go @@ -0,0 +1,80 @@ +package fsblob + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "nano-run/services/blob" +) + +// Dummy file storage: large storage based on file system. +// ID - just name of file, but content will be written atomically. +func New(rootDir string) blob.Blob { + return &fsBlob{ + rootDir: rootDir, + checker: func(id string) bool { + return true + }, + } +} + +func NewCheck(rootDir string, checkFn func(string) bool) blob.Blob { + return &fsBlob{ + rootDir: rootDir, + checker: checkFn, + } +} + +type fsBlob struct { + rootDir string + checker func(id string) bool +} + +func (k *fsBlob) Push(id string, handler func(out io.Writer) error) error { + if !k.checker(id) { + return fmt.Errorf("push: invalid id %s", id) + } + dir := filepath.Join(k.rootDir, id[0:1]) + err := os.MkdirAll(dir, 0755) + if err != nil { + return err + } + tempFile := filepath.Join(dir, id+".temp") + destFile := filepath.Join(dir, id) + + tempF, err := os.Create(tempFile) + if err != nil { + return fmt.Errorf("fsblob: put data: create temp file: %w", err) + } + err = handler(tempF) + flushErr := tempF.Close() + if err != nil { + _ = os.Remove(tempFile) + return err + } + if flushErr != nil { + _ = os.Remove(tempFile) + return fmt.Errorf("fsblob: put data: flush content to temp file: %w", err) + } + + err = os.Rename(tempFile, destFile) + if err != nil { + return fmt.Errorf("fsblob: put data: commit file: %w", err) + } + return nil +} + +func (k *fsBlob) Get(id string) (io.ReadCloser, error) { + if !k.checker(id) { + return nil, fmt.Errorf("get: invalid id %s", id) + } + dir := filepath.Join(k.rootDir, id[0:1]) + destFile := filepath.Join(dir, id) + f, err := os.Open(destFile) + if err == nil { + return f, nil + } + return nil, fmt.Errorf("fsblob: get file: %w", err) +} diff --git a/services/blob/interface.go b/services/blob/interface.go new file mode 100644 index 0000000..6efc49d --- /dev/null +++ b/services/blob/interface.go @@ -0,0 +1,11 @@ +package blob + +import "io" + +// Large (more then memory) content storage. +type Blob interface { + // Push content to the storage using provided writer. + Push(id string, handler func(out io.Writer) error) error + // Get content from the storage. + Get(id string) (io.ReadCloser, error) +} diff --git a/services/meta/interface.go b/services/meta/interface.go new file mode 100644 index 0000000..0300f36 --- /dev/null +++ b/services/meta/interface.go @@ -0,0 +1,35 @@ +package meta + +import ( + "net/http" + "time" +) + +type Meta interface { + Get(requestID string) (*Request, error) + CreateRequest(requestID string, headers http.Header, uri string, method string) error + AddAttempt(requestID, attemptID string, header AttemptHeader) (*Request, error) + Complete(requestID string) error + Iterate(handler func(id string, record Request) error) error +} + +type AttemptHeader struct { + Code int `json:"code"` + Headers http.Header `json:"headers"` +} + +type Attempt struct { + AttemptHeader + ID string `json:"id"` + CreatedAt time.Time `json:"created_at"` +} + +type Request struct { + CreatedAt time.Time `json:"created_at"` + CompleteAt time.Time `json:"complete_at,omitempty"` + Attempts []Attempt `json:"attempts"` + Headers http.Header `json:"headers"` + URI string `json:"uri"` + Method string `json:"method"` + Complete bool `json:"complete"` +} diff --git a/services/meta/micrometa/request_meta_storage.go b/services/meta/micrometa/request_meta_storage.go new file mode 100644 index 0000000..edfbdc4 --- /dev/null +++ b/services/meta/micrometa/request_meta_storage.go @@ -0,0 +1,148 @@ +package micrometa + +import ( + "encoding/json" + "log" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/dgraph-io/badger" + + "nano-run/internal" + "nano-run/services/meta" +) + +func WrapMetaStorage(db *badger.DB) *MicroMeta { + return &MicroMeta{db: db, wrapped: true} +} + +func NewMetaStorage(location string) (*MicroMeta, error) { + name := filepath.Base(location) + db, err := badger.Open(badger.DefaultOptions(location). + WithLogger(internal.NanoLogger(log.New(os.Stderr, "["+name+"] ", log.LstdFlags)))) + if err != nil { + return nil, err + } + return &MicroMeta{ + db: db, + wrapped: false, + }, nil +} + +type MicroMeta struct { + db *badger.DB + wrapped bool +} + +func (rms *MicroMeta) Get(requestID string) (*meta.Request, error) { + var ans meta.Request + return &ans, rms.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(requestID)) + if err != nil { + return err + } + return item.Value(func(val []byte) error { + return json.Unmarshal(val, &ans) + }) + }) +} + +func (rms *MicroMeta) CreateRequest(requestID string, headers http.Header, uri string, method string) error { + var record = meta.Request{ + CreatedAt: time.Now(), + Attempts: make([]meta.Attempt, 0), + Complete: false, + Headers: headers, + URI: uri, + Method: method, + } + data, err := json.Marshal(record) + if err != nil { + return err + } + return rms.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(requestID), data) + }) +} + +func (rms *MicroMeta) AddAttempt(requestID, attemptID string, header meta.AttemptHeader) (*meta.Request, error) { + var ans meta.Request + err := rms.updateRequest(requestID, func(record *meta.Request) error { + record.Attempts = append(record.Attempts, meta.Attempt{ + ID: attemptID, + CreatedAt: time.Now(), + AttemptHeader: header, + }) + ans = *record + return nil + }) + return &ans, err +} + +func (rms *MicroMeta) Complete(requestID string) error { + return rms.updateRequest(requestID, func(record *meta.Request) error { + record.Complete = true + record.CompleteAt = time.Now() + return nil + }) +} + +func (rms *MicroMeta) Close() error { + if rms.wrapped { + return nil + } + return rms.db.Close() +} + +func (rms *MicroMeta) Iterate(handler func(id string, record meta.Request) error) error { + return rms.db.View(func(txn *badger.Txn) error { + iter := txn.NewIterator(badger.DefaultIteratorOptions) + iter.Rewind() + defer iter.Close() + for iter.Valid() { + id := string(iter.Item().Key()) + var rec meta.Request + err := iter.Item().Value(func(val []byte) error { + return json.Unmarshal(val, &rec) + }) + if err != nil { + return err + } + err = handler(id, rec) + if err != nil { + return err + } + iter.Next() + } + return nil + }) +} + +func (rms *MicroMeta) updateRequest(requestID string, tx func(record *meta.Request) error) error { + return rms.db.Update(func(txn *badger.Txn) error { + data, err := txn.Get([]byte(requestID)) + if err != nil { + return err + } + var record meta.Request + err = data.Value(func(val []byte) error { + return json.Unmarshal(val, &record) + }) + if err != nil { + return err + } + + err = tx(&record) + if err != nil { + return err + } + + value, err := json.Marshal(record) + if err != nil { + return err + } + return txn.Set([]byte(requestID), value) + }) +} diff --git a/services/queue/interface.go b/services/queue/interface.go new file mode 100644 index 0000000..5e3bebc --- /dev/null +++ b/services/queue/interface.go @@ -0,0 +1,8 @@ +package queue + +import "context" + +type Queue interface { + Push(payload []byte) error + Get(ctx context.Context) ([]byte, error) +} diff --git a/services/queue/microqueue/micro_queue.go b/services/queue/microqueue/micro_queue.go new file mode 100644 index 0000000..d5cf71d --- /dev/null +++ b/services/queue/microqueue/micro_queue.go @@ -0,0 +1,123 @@ +package microqueue + +import ( + "context" + "encoding/binary" + "errors" + "log" + "os" + "path/filepath" + "sync/atomic" + + "github.com/dgraph-io/badger" + + "nano-run/internal" +) + +var ErrEmptyQueue = errors.New("empty queue") + +func WrapMicroQueue(db *badger.DB) (*MicroQueue, error) { + mc := &MicroQueue{db: db, wrapped: true, notify: make(chan struct{}, 1), close: make(chan struct{})} + return mc, mc.db.DropAll() +} + +func NewMicroQueue(location string) (*MicroQueue, error) { + name := filepath.Base(location) + db, err := badger.Open(badger.DefaultOptions(location).WithTruncate(true).WithLogger(internal.NanoLogger(log.New(os.Stderr, "["+name+"] ", log.LstdFlags)))) + if err != nil { + return nil, err + } + return &MicroQueue{db: db, notify: make(chan struct{}, 1), close: make(chan struct{})}, nil +} + +// Always fresh queue with offloading to fs if no readers. +// Optimized for multiple readers and multiple writers with number of items limited by FS +// and each value should fit to RAM. +type MicroQueue struct { + db *badger.DB + sequence uint64 + wrapped bool + notify chan struct{} + close chan struct{} +} + +func (mq *MicroQueue) Push(payload []byte) error { + id := atomic.AddUint64(&mq.sequence, 1) + var key [8]byte + binary.BigEndian.PutUint64(key[:], id) + err := mq.db.Update(func(txn *badger.Txn) error { + return txn.Set(key[:], payload) + }) + if err != nil { + return err + } + mq.sendNotify() + return nil +} + +func (mq *MicroQueue) pop() ([]byte, error) { + var ans []byte + err := mq.db.Update(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{ + PrefetchValues: true, + PrefetchSize: 1, + Reverse: false, + AllVersions: false, + }) + defer it.Close() + it.Rewind() + + if !it.Valid() { + return ErrEmptyQueue + } + v, err := it.Item().ValueCopy(ans) + if err != nil { + return err + } + ans = v + + return txn.Delete(it.Item().Key()) + }) + if err != nil { + return nil, err + } + mq.sendNotify() + return ans, nil +} + +// Get blocking for new item in a queue. +func (mq *MicroQueue) Get(ctx context.Context) ([]byte, error) { + mq.sendNotify() + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-mq.close: + return nil, errors.New("queue closed") + case <-mq.notify: + v, err := mq.pop() + if err == nil { + return v, nil + } + if errors.Is(err, ErrEmptyQueue) { + continue + } + return nil, err + } + } +} + +func (mq *MicroQueue) Close() error { + close(mq.close) + if mq.wrapped { + return nil + } + return mq.db.Close() +} + +func (mq *MicroQueue) sendNotify() { + select { + case mq.notify <- struct{}{}: + default: + } +} diff --git a/worker/request_io.go b/worker/request_io.go new file mode 100644 index 0000000..ad15acb --- /dev/null +++ b/worker/request_io.go @@ -0,0 +1,37 @@ +package worker + +import ( + "io" + "net/http" + + "nano-run/services/meta" +) + +func openResponse(writer io.Writer) *responseStream { + return &responseStream{stream: writer, meta: meta.AttemptHeader{Headers: make(http.Header)}} +} + +type responseStream struct { + meta meta.AttemptHeader + statusSent bool + stream io.Writer +} + +func (mo *responseStream) Header() http.Header { + return mo.meta.Headers +} + +func (mo *responseStream) Write(bytes []byte) (int, error) { + if !mo.statusSent { + mo.WriteHeader(http.StatusOK) + } + return mo.stream.Write(bytes) +} + +func (mo *responseStream) WriteHeader(statusCode int) { + if mo.statusSent { + return + } + mo.statusSent = true + mo.meta.Code = statusCode +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..2e90219 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,415 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/google/uuid" + + "nano-run/services/blob" + "nano-run/services/blob/fsblob" + "nano-run/services/meta" + "nano-run/services/meta/micrometa" + "nano-run/services/queue" + "nano-run/services/queue/microqueue" +) + +type ( + CompleteHandler func(ctx context.Context, requestID string, info *meta.Request) + ProcessHandler func(ctx context.Context, requestID, attemptID string, info *meta.Request) +) + +const ( + defaultAttempts = 3 + defaultInterval = 3 * time.Second + minimalFailedCode = 500 +) + +func Default(location string) (*Worker, error) { + path := filepath.Join(location, "blobs") + err := os.MkdirAll(path, 0755) + if err != nil { + return nil, err + } + storage := fsblob.NewCheck(path, func(id string) bool { + _, err := uuid.Parse(id) + return err == nil + }) + + taskQueue, err := microqueue.NewMicroQueue(filepath.Join(location, "queue")) + if err != nil { + return nil, err + } + requeue, err := microqueue.NewMicroQueue(filepath.Join(location, "requeue")) + if err != nil { + return nil, err + } + metaStorage, err := micrometa.NewMetaStorage(filepath.Join(location, "meta")) + if err != nil { + return nil, err + } + + cleanup := func() { + _ = requeue.Close() + _ = taskQueue.Close() + _ = metaStorage.Close() + } + + wrk, err := New(taskQueue, requeue, storage, metaStorage) + if err != nil { + cleanup() + return nil, err + } + wrk.cleanup = cleanup + return wrk, nil +} + +func New(tasks, requeue queue.Queue, blobs blob.Blob, meta meta.Meta) (*Worker, error) { + wrk := &Worker{ + queue: tasks, + requeue: requeue, + blob: blobs, + meta: meta, + maxAttempts: defaultAttempts, + interval: defaultInterval, + concurrency: runtime.NumCPU(), + handler: http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(http.StatusNoContent) + }), + } + err := wrk.init() + if err != nil { + return nil, err + } + return wrk, nil +} + +type Worker struct { + queue queue.Queue + requeue queue.Queue + blob blob.Blob + meta meta.Meta + handler http.Handler + cleanup func() + + onDead CompleteHandler + onSuccess CompleteHandler + onProcess ProcessHandler + maxAttempts int + concurrency int + interval time.Duration +} + +func (mgr *Worker) init() error { + return mgr.meta.Iterate(func(id string, record meta.Request) error { + if !record.Complete { + log.Println("found incomplete job", id) + return mgr.queue.Push([]byte(id)) + } + return nil + }) +} + +func (mgr *Worker) Close() { + if fn := mgr.cleanup; fn != nil { + fn() + } +} + +func (mgr *Worker) Enqueue(req *http.Request) (string, error) { + id, err := mgr.saveRequest(req) + if err != nil { + return "", err + } + log.Println("new request saved:", id) + err = mgr.queue.Push([]byte(id)) + return id, err +} + +func (mgr *Worker) OnSuccess(handler CompleteHandler) *Worker { + mgr.onSuccess = handler + return mgr +} + +func (mgr *Worker) OnDead(handler CompleteHandler) *Worker { + mgr.onDead = handler + return mgr +} + +func (mgr *Worker) OnProcess(handler ProcessHandler) *Worker { + mgr.onProcess = handler + return mgr +} + +func (mgr *Worker) Handler(handler http.Handler) *Worker { + mgr.handler = handler + return mgr +} + +func (mgr *Worker) HandlerFunc(fn http.HandlerFunc) *Worker { + mgr.handler = fn + return mgr +} + +// Attempts number of 500x requests. +func (mgr *Worker) Attempts(max int) *Worker { + mgr.maxAttempts = max + return mgr +} + +// Interval between attempts. +func (mgr *Worker) Interval(duration time.Duration) *Worker { + mgr.interval = duration + return mgr +} + +// Concurrency limit (number of parallel tasks). Does not affect already running worker. +// 0 means num CPU. +func (mgr *Worker) Concurrency(num int) *Worker { + mgr.concurrency = num + if num == 0 { + mgr.concurrency = runtime.NumCPU() + } + return mgr +} + +// Meta information about requests. +func (mgr *Worker) Meta() meta.Meta { + return mgr.meta +} + +// Blobs storage (for large objects). +func (mgr *Worker) Blobs() blob.Blob { + return mgr.blob +} + +func (mgr *Worker) Run(global context.Context) error { + if mgr.interval < 0 { + return fmt.Errorf("negative interval") + } + if mgr.maxAttempts < 0 { + return fmt.Errorf("negative attempts") + } + if mgr.handler == nil { + return fmt.Errorf("nil handler") + } + if mgr.concurrency <= 0 { + return fmt.Errorf("invalid concurrency number") + } + ctx, cancel := context.WithCancel(global) + defer cancel() + var wg sync.WaitGroup + for i := 0; i < mgr.concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + defer cancel() + err := mgr.runQueue(ctx) + if err != nil { + log.Println("worker", i, "stopped due to error:", err) + } else { + log.Println("worker", i, "stopped") + } + }(i) + } + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + err := mgr.runReQueue(ctx) + if err != nil { + log.Println("re-queue process stopped due to error:", err) + } else { + log.Println("re-queue process stopped") + } + }() + wg.Wait() + return ctx.Err() +} + +func (mgr *Worker) call(ctx context.Context, requestID string, info *meta.Request) error { + // caller should ensure that request id is valid + f, err := mgr.blob.Get(requestID) + if err != nil { + return err + } + defer f.Close() + + req, err := http.NewRequestWithContext(ctx, info.Method, info.URI, f) + if err != nil { + return err + } + for k, v := range info.Headers { + req.Header[k] = v + } + + attemptID := uuid.New().String() + + var header meta.AttemptHeader + + err = mgr.blob.Push(attemptID, func(out io.Writer) error { + res := openResponse(out) + mgr.handler.ServeHTTP(res, req) + header = res.meta + return nil + }) + if err != nil { + return err + } + + info, err = mgr.meta.AddAttempt(requestID, attemptID, header) + if err != nil { + return err + } + + mgr.requestProcessed(ctx, requestID, attemptID, info) + if header.Code >= minimalFailedCode { + return fmt.Errorf("500 code returned: %d", header.Code) + } + return nil +} + +func (mgr *Worker) runQueue(ctx context.Context) error { + for { + err := mgr.processQueueItem(ctx) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } +} + +func (mgr *Worker) runReQueue(ctx context.Context) error { + for { + err := mgr.processReQueueItem(ctx) + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } +} + +func (mgr *Worker) processQueueItem(ctx context.Context) error { + bid, err := mgr.queue.Get(ctx) + if err != nil { + return err + } + id := string(bid) + log.Println("processing request", id) + info, err := mgr.meta.Get(id) + if err != nil { + return fmt.Errorf("get request %s meta info: %w", id, err) + } + if info.Complete { + return fmt.Errorf("request %s already complete", id) + } + err = mgr.call(ctx, id, info) + if err == nil { + mgr.requestSuccess(ctx, id, info) + return nil + } + return mgr.requeueItem(ctx, id, info) +} + +func (mgr *Worker) processReQueueItem(ctx context.Context) error { + var item requeueItem + + data, err := mgr.requeue.Get(ctx) + if err != nil { + return err + } + err = json.Unmarshal(data, &item) + + if err != nil { + return err + } + + d := time.Since(item.At) + if d < mgr.interval { + select { + case <-time.After(mgr.interval - d): + case <-ctx.Done(): + return ctx.Err() + } + } + return mgr.queue.Push([]byte(item.ID)) +} + +func (mgr *Worker) requeueItem(ctx context.Context, id string, info *meta.Request) error { + if len(info.Attempts) >= mgr.maxAttempts { + mgr.requestDead(ctx, id, info) + log.Println("maximum attempts reached for request", id) + return nil + } + data, err := json.Marshal(requeueItem{ + At: time.Now(), + ID: id, + }) + if err != nil { + return err + } + return mgr.requeue.Push(data) +} + +func (mgr *Worker) saveRequest(req *http.Request) (string, error) { + id := uuid.New().String() + err := mgr.blob.Push(id, func(out io.Writer) error { + _, err := io.Copy(out, req.Body) + return err + }) + if err != nil { + return "", err + } + return id, mgr.meta.CreateRequest(id, req.Header, req.RequestURI, req.Method) +} + +func (mgr *Worker) requestDead(ctx context.Context, id string, info *meta.Request) { + err := mgr.meta.Complete(id) + if err != nil { + log.Println("failed complete (dead) request:", err) + } + if handler := mgr.onDead; handler != nil { + handler(ctx, id, info) + } + log.Println("request", id, "completely failed") +} + +func (mgr *Worker) requestSuccess(ctx context.Context, id string, info *meta.Request) { + err := mgr.meta.Complete(id) + if err != nil { + log.Println("failed complete (success) request:", err) + } + if handler := mgr.onSuccess; handler != nil { + handler(ctx, id, info) + } + log.Println("request", id, "complete successfully") +} + +func (mgr *Worker) requestProcessed(ctx context.Context, id string, attemptID string, info *meta.Request) { + if handler := mgr.onProcess; handler != nil { + handler(ctx, id, attemptID, info) + } + log.Println("request", id, "processed with attempt", attemptID) +} + +type requeueItem struct { + At time.Time + ID string +}