Functions/template/python3-fastapi/function/handler.py

51 lines
1.6 KiB
Python

# author: Justin Guese, 11.3.22, justin@datafortress.cloud
from fastapi import HTTPException
from fastapi import APIRouter
from pydantic import BaseModel
from typing import Dict, List
from os import environ
import glob
# reads in secrets to environment variables, such that they can be
# easily used with environ["SECRET_NAME"]
def readSecretToEnv(secretpath):
secretname = secretpath.split('/')[-1]
with open(secretpath, "r") as f:
environ[secretname] = f.read()
for secret in glob.glob("/var/openfaas/secrets/*"):
readSecretToEnv(secret)
router = APIRouter()
# just as an example
class User(BaseModel):
id: int
name: str
age: int
colleagues: List[str]
class ResponseModel(BaseModel):
data: Dict
# user: User
# otherStuff: str
@router.post("/", response_model = ResponseModel, tags=["Main Routes"])
def handle(request: Dict):
"""handle a request to the function
Args:
req (dict): request body
"""
try:
res = ResponseModel(data=request)
except Exception as e:
raise HTTPException(status_code=500, detail=str(repr(e)))
return res
@router.get("/", response_model = ResponseModel, tags=["Main Routes"])
def get():
return ResponseModel(data={"message": "Hello from OpenFAAS!"})
# again just as an example, delete this if not required
@router.get("/users/{user_id}", response_model = User, tags=["Main Routes"])
def getUser(user_id: int):
return User(id = user_id, name="Exampleuser", age=20, colleagues=["Colleague 1", "Colleague 2"])