# -*- coding: utf-8 -*- """Test API Utilisation : créer les variables d'environnement: (indiquer les valeurs pour le serveur ScoDoc que vous voulez interroger) export SCODOC_URL="https://scodoc.xxx.net/" export API_USER="xxx" export SCODOC_PASSWD="xxx" export CHECK_CERTIFICATE=0 # ou 1 si serveur de production avec certif SSL valide (on peut aussi placer ces valeurs dans un fichier .env du répertoire tests/api). """ import os import requests from dotenv import load_dotenv import pytest # --- Lecture configuration (variables d'env ou .env) try: BASEDIR = os.path.abspath(os.path.dirname(__file__)) except NameError: BASEDIR = "/opt/scodoc/tests/api" load_dotenv(os.path.join(BASEDIR, ".env")) CHECK_CERTIFICATE = bool(os.environ.get("CHECK_CERTIFICATE", False)) SCODOC_URL = os.environ["SCODOC_URL"] or "http://localhost:5000" API_URL = SCODOC_URL + "/ScoDoc/api" API_USER = os.environ.get("API_USER", "test") API_PASSWORD = os.environ.get("API_PASSWORD", os.environ.get("API_PASSWD", "test")) API_USER_ADMIN = os.environ.get("API_USER_ADMIN", "admin_api") API_PASSWORD_ADMIN = os.environ.get("API_PASSWD_ADMIN", "admin_api") DEPT_ACRONYM = "TAPI" SCO_TEST_API_TIMEOUT = 5 print(f"SCODOC_URL={SCODOC_URL}") print(f"API URL={API_URL}") class APIError(Exception): def __init__(self, message: str = "", payload=None): self.message = message self.payload = payload or {} def get_auth_headers(user, password) -> dict: "Demande de jeton, dict à utiliser dans les en-têtes de requêtes http" ans = requests.post(API_URL + "/tokens", auth=(user, password), timeout=5) if ans.status_code != 200: raise APIError(f"Echec demande jeton par {user}") token = ans.json()["token"] return {"Authorization": f"Bearer {token}"} @pytest.fixture def api_headers() -> dict: """Jeton, utilisateur API ordinaire""" return get_auth_headers(API_USER, API_PASSWORD) @pytest.fixture def api_admin_headers() -> dict: """Jeton, utilisateur API SuperAdmin""" return get_auth_headers(API_USER_ADMIN, API_PASSWORD_ADMIN) def GET(path: str, headers: dict = None, errmsg=None, dept=None): """Get and returns as JSON Special case for non json result (image or pdf): return Content-Disposition string (inline or attachment) """ if dept: url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path else: url = API_URL + path reply = requests.get( url, headers=headers or {}, verify=CHECK_CERTIFICATE, timeout=SCO_TEST_API_TIMEOUT, ) if reply.status_code != 200: raise APIError( errmsg or f"""erreur status={reply.status_code} !""", reply.json() ) if reply.headers.get("Content-Type", None) == "application/json": return reply.json() # decode la reponse JSON elif reply.headers.get("Content-Type", None) in [ "image/jpg", "image/png", "application/pdf", ]: retval = { "Content-Type": reply.headers.get("Content-Type", None), "Content-Disposition": reply.headers.get("Content-Disposition", None), } return retval raise APIError("Unknown returned content {r.headers.get('Content-Type', None} !\n") def POST_JSON(path: str, data: dict = {}, headers: dict = None, errmsg=None, dept=None): """Post""" if dept: url = SCODOC_URL + f"/ScoDoc/{dept}/api" + path else: url = API_URL + path r = requests.post( url, json=data, headers=headers or {}, verify=CHECK_CERTIFICATE, timeout=SCO_TEST_API_TIMEOUT, ) if r.status_code != 200: raise APIError(errmsg or f"erreur status={r.status_code} !", r.json()) return r.json() # decode la reponse JSON