Functions/iol/handler.py

158 lines
5.0 KiB
Python
Raw Normal View History

2023-07-01 22:59:14 +00:00
import logging
import time
from functools import lru_cache
import requests as r
from fastapi import APIRouter
BASE_URL = "https://api.invertironline.com/"
USER = open("/var/openfaas/secrets/iol-user").read().strip()
PASS = open("/var/openfaas/secrets/iol-pass").read().strip()
SECRET = open("/var/openfaas/secrets/iol-api-secret").read().strip()
router = APIRouter()
2023-08-07 15:14:16 +00:00
last_results = {}
2023-07-01 22:59:14 +00:00
def get_ttl_hash(seconds=3600):
"""Return the same value withing `seconds` time period"""
return round(time.time() / seconds)
2023-08-07 15:14:16 +00:00
2023-07-01 22:59:14 +00:00
@lru_cache
def get_token(ttl_hash=None):
logging.error("getting token")
url = BASE_URL + "token"
data = {"username": USER, "password": PASS, "grant_type": "password"}
response = r.post(url, data=data)
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
return access_token, refresh_token
def refresh_token(refresh_token):
url = BASE_URL + "token"
data = {"refresh_token": refresh_token, "grant_type": "refresh_token"}
response = r.post(url, data=data)
access_token = response.json()["access_token"]
refresh_token = response.json()["refresh_token"]
return access_token, refresh_token
paises = {
"US": "estados_Unidos",
"AR": "argentina",
}
2023-08-07 15:14:16 +00:00
2023-07-01 22:59:14 +00:00
@lru_cache
def get(url, ttl_hash=None):
logging.error("getting data")
access, refresh = get_token(ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
response = r.get(
url, headers={"Authorization": "Bearer " + access, "Accept": "application/json"}
)
2023-07-01 22:59:14 +00:00
return response.json()
@router.get("/{secret}/accion/{pais}/{accion}")
def get_accion(secret, pais, accion):
if secret != SECRET:
raise Exception("BAD")
pais = pais.upper()
accion = accion.upper()
2023-08-07 15:14:16 +00:00
try:
access, refresh = get_token(ttl_hash=get_ttl_hash())
url = (
BASE_URL
+ f"/api/v2/Cotizaciones/acciones/{pais}/Todos?cotizacionInstrumentoModel.instrumento=acciones&cotizacionInstrumentoModel.pais={paises[pais]}&api_key={access}"
)
data = get(url, ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
last_results[f"accion/{pais}/{accion}"] = [
a for a in data["titulos"] if a["simbolo"] == accion
][0]
except:
pass
return last_results[f"accion/{pais}/{accion}"]
2023-07-02 00:50:44 +00:00
@router.get("/{secret}/bono/{nombre}")
def get_bono(secret, nombre):
if secret != SECRET:
raise Exception("BAD")
nombre = nombre.upper()
access, refresh = get_token(ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
try:
url = (
BASE_URL
+ f"/api/v2/Cotizaciones/titulosPublicos/ar/Todos?cotizacionInstrumentoModel.instrumento=titulosPublicos&cotizacionInstrumentoModel.pais=argentina&api_key={access}"
)
data = get(url, ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
last_results[f"bono/{nombre}"] = [
a for a in data["titulos"] if a["simbolo"] == nombre
][0]
except:
pass
return last_results[f"bono/{nombre}"]
2023-07-02 00:50:44 +00:00
@router.get("/{secret}/cedear/{nombre}")
def get_cedear(secret, nombre):
if secret != SECRET:
raise Exception("BAD")
nombre = nombre.upper()
access, refresh = get_token(ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
try:
url = (
BASE_URL
+ f"/api/v2/Cotizaciones/cedears/ar/Todos?cotizacionInstrumentoModel.instrumento=cedears&cotizacionInstrumentoModel.pais=argentina&api_key={access}"
)
data = get(url, ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
last_results[f"cedear/{nombre}"] = [
a for a in data["titulos"] if a["simbolo"] == nombre
][0]
except:
pass
return last_results[f"cedear/{nombre}"]
2023-07-02 00:50:44 +00:00
@router.get("/{secret}/ON/{nombre}")
def get_on(secret, nombre):
if secret != SECRET:
raise Exception("BAD")
nombre = nombre.upper()
access, refresh = get_token(ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
try:
url = (
BASE_URL
+ f"/api/v2/Cotizaciones/obligacionesnegociables/ar/Todos?cotizacionInstrumentoModel.instrumento=obligacionesNegociables&cotizacionInstrumentoModel.pais=argentina&api_key={access}"
)
data = get(url, ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
last_results[f"ON/{nombre}"] = [
a for a in data["titulos"] if a["simbolo"] == nombre
][0]
except:
pass
return last_results[f"ON/{nombre}"]
2023-07-02 00:50:44 +00:00
@router.get("/{secret}/ADR/{nombre}")
def get_adrs(secret, nombre):
if secret != SECRET:
raise Exception("BAD")
nombre = nombre.upper()
access, refresh = get_token(ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
try:
url = (
BASE_URL
+ f"/api/v2/Cotizaciones/adrs/us/Todos?cotizacionInstrumentoModel.instrumento=aDRs&cotizacionInstrumentoModel.pais=estados_Unidos&api_key={access}"
)
data = get(url, ttl_hash=get_ttl_hash())
2023-08-07 15:14:16 +00:00
last_results[f"ADR/{nombre}"] = [
a for a in data["titulos"] if a["simbolo"] == nombre
][0]
except:
pass
return last_results[f"ADR/{nombre}"]