justificatifs : archivage+test api (sauf export)

This commit is contained in:
iziram 2023-02-01 20:00:14 +01:00
parent 8bc780f2cf
commit 547040bb93
5 changed files with 332 additions and 39 deletions

View File

@ -24,22 +24,6 @@ from flask_login import login_required
from app.scodoc.sco_utils import json_error
# @bp.route("/justificatif/import")
# @api_web_bp.route("/justificatif/import")
# @scodoc
# def justificatif():
# """ """
# archiver: JustificatifArchiver = JustificatifArchiver()
# filename: str = "lol.txt"
# data: bytes = "test".encode("utf-8")
# archiver.save_justificatif(
# etudid=1, filename=filename, data=data, archive_id="2023-02-01-10-29-20"
# )
# return jsonify([filename, "done"])
# @bp.route("/justificatif/remove")
# @api_web_bp.route("/justificatif/remove")
# @scodoc
@ -133,8 +117,6 @@ def justificatifs(etudid: int = None, with_query: bool = False):
# TODO: justificatif-create
@bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@api_web_bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@scodoc
@ -336,9 +318,177 @@ def _delete_singular(justif_id: int, database):
# Partie archivage
# TODO: justificatif-import
@bp.route("/justificatif/import/<int:justif_id>", methods=["POST"])
@api_web_bp.route("/justificatif/import/<int:justif_id>", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_import(justif_id: int = None):
"""
Importation d'un fichier (création d'archive)
"""
if len(request.files) == 0:
return json_error(404, "Il n'y a pas de fichier joint")
file = list(request.files.values())[0]
if file.filename == "":
return json_error(404, "Il n'y a pas de fichier joint")
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
archiver: JustificatifArchiver = JustificatifArchiver()
try:
archive_name: str = archiver.save_justificatif(
etudid=justificatif_unique.etudid,
filename=file.filename,
data=file.stream.read(),
archive_name=archive_name,
)
justificatif_unique.fichier = archive_name
db.session.add(justificatif_unique)
db.session.commit()
return jsonify({"response": "imported"})
except ScoValueError as err:
return json_error(404, err.args[1])
# TODO: justificatif-export
@bp.route("/justificatif/export/<int:justif_id>/<filename>", methods=["GET"])
@api_web_bp.route("/justificatif/export/<int:justif_id>/<filename>", methods=["GET"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_export(justif_id: int = None, filename: str = None):
"""
Retourne un fichier d'une archive d'un justificatif
"""
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
if archive_name is None:
return json_error(404, "le justificatif ne possède pas de fichier")
archiver: JustificatifArchiver = JustificatifArchiver()
try:
return archiver.get_justificatif_file(
archive_name, justificatif_unique.etudid, filename
)
except ScoValueError as err:
return json_error(404, err.args[1])
# TODO: justificatif-remove
@bp.route("/justificatif/remove/<int:justif_id>", methods=["POST"])
@api_web_bp.route("/justificatif/remove/<int:justif_id>", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_remove(justif_id: int = None):
"""
Supression d'un fichier ou d'une archive
{
"remove": <"all"/"list">
"filenames"?: [
<filename:str>,
...
]
}
"""
data: dict = request.get_json(force=True)
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
if archive_name is None:
return json_error(404, "le justificatif ne possède pas de fichier")
remove: str = data.get("remove")
if remove is None or remove not in ("all", "list"):
return json_error(404, "param 'remove': Valeur invalide")
archiver: JustificatifArchiver = JustificatifArchiver()
etudid: int = justificatif_unique.etudid
try:
if remove == "all":
archiver.delete_justificatif(etudid=etudid, archive_name=archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
else:
for fname in data.get("filenames", []):
archiver.delete_justificatif(
etudid=etudid,
archive_name=archive_name,
filename=fname,
)
if len(archiver.list_justificatifs(archive_name, etudid)) == 0:
archiver.delete_justificatif(etudid, archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
except ScoValueError as err:
return json_error(404, err.args[1])
return jsonify({"response": "removed"})
# TODO: justificatif-list
@bp.route("/justificatif/list/<int:justif_id>", methods=["GET"])
@api_web_bp.route("/justificatif/list/<int:justif_id>", methods=["GET"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_list(justif_id: int = None):
"""
Liste les fichiers du justificatif
"""
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
filenames: list[str] = []
archiver: JustificatifArchiver = JustificatifArchiver()
if archive_name is not None:
filenames = archiver.list_justificatifs(
archive_name, justificatif_unique.etudid
)
return jsonify(filenames)
# Partie justification
# TODO: justificatif-justified

View File

@ -1,4 +1,7 @@
from app.scodoc.sco_archives import BaseArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.models import Identite, Departement
from flask import g
import os
@ -29,34 +32,39 @@ class JustificatifArchiver(BaseArchiver):
etudid: int,
filename: str,
data: bytes or str,
archive_id: str = None,
archive_name: str = None,
description: str = "",
):
) -> str:
"""
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
Retourne l'archive_name utilisé
"""
if archive_id is None:
self._set_dept(etudid)
if archive_name is None:
archive_id: str = self.create_obj_archive(
oid=etudid, description=description
)
else:
archive_id = self._true_archive_id(archive_id, etudid)
archive_id: str = self.get_id_from_name(etudid, archive_name)
self.store(archive_id, filename, data)
def delete_justificatif(self, etudid: int, archive_id: str, filename: str = None):
return self.get_archive_name(archive_id)
def delete_justificatif(self, etudid: int, archive_name: str, filename: str = None):
"""
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
"""
self._set_dept(etudid)
if str(etudid) not in self.list_oids():
raise ValueError(f"Aucune archive pour etudid[{etudid}]")
archive_id = self._true_archive_id(archive_id, etudid)
archive_id = self.get_id_from_name(etudid, archive_name)
if filename is not None:
if filename not in self.list_archive(archive_id):
raise ValueError(
f"filename inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]"
f"filename {filename} inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]"
)
path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename)
@ -72,19 +80,33 @@ class JustificatifArchiver(BaseArchiver):
)
)
def _true_archive_id(self, archive_id: str, etudid: int):
def list_justificatifs(self, archive_name: str, etudid: int) -> list[str]:
"""
Test si l'archive_id est bien dans le dossier d'archive
Retourne le chemin complet de l'id
Retourne la liste des noms de fichiers dans l'archive donnée
"""
archives: list[str] = [
arc for arc in self.list_obj_archives(etudid) if archive_id in arc
]
self._set_dept(etudid)
filenames: list[str] = []
archive_id = self.get_id_from_name(etudid, archive_name)
if len(archives) == 0:
raise ValueError(
f"archive_id[{archive_id}] inconnu pour etudid[{etudid}]",
self.list_obj_archives(etudid),
filenames = self.list_archive(archive_id)
return filenames
def get_justificatif_file(self, archive_name: str, etudid: int, filename: str):
"""
Retourne une réponse de téléchargement de fichier si le fichier existe
"""
self._set_dept(etudid)
archive_id: str = self.get_id_from_name(etudid, archive_name)
if filename in self.list_archive(archive_id):
return self.get_archived_file(etudid, archive_name, filename)
raise ScoValueError(
f"Fichier {filename} introuvable dans l'archive {archive_name}"
)
return archives[0]
def _set_dept(self, etudid: int):
if g.scodoc_dept is None or g.scodoc_dept_id is None:
etud: Identite = Identite.query.filter_by(id=etudid).first()
dept: Departement = Departement.query.filter_by(id=etud.dept_id).first()
g.scodoc_dept = dept.acronym
g.scodoc_dept_id = dept.id

View File

@ -0,0 +1 @@
test de l'importation des fichiers / archive justificatif

View File

@ -0,0 +1 @@
test de l'importation des fichiers / archive justificatif

View File

@ -7,7 +7,15 @@ Ecrit par HARTMANN Matthias
from random import randint
from tests.api.setup_test_api import GET, POST_JSON, APIError, api_headers
from tests.api.setup_test_api import (
GET,
POST_JSON,
APIError,
api_headers,
API_URL,
CHECK_CERTIFICATE,
)
import requests
ETUDID = 1
FAUX = 42069
@ -60,7 +68,7 @@ def check_failure_post(path, headers, data, err=None):
if err is not None:
assert api_err.payload["message"] == err
else:
raise APIError("Le GET n'aurait pas du fonctionner")
raise APIError("Le POST n'aurait pas du fonctionner")
def create_data(etat: str, day: str, raison: str = None):
@ -253,3 +261,114 @@ def test_route_delete(api_headers):
assert len(res["errors"]) == 3
assert all([res["errors"][i] == "Justificatif non existant" for i in res["errors"]])
# Gestion de l'archivage
def send_file(justif_id: int, filename: str, headers):
"""
Envoi un fichier vers la route d'importation
"""
with open(filename, "rb") as file:
url: str = API_URL + f"/justificatif/import/{justif_id}"
r = requests.post(
url,
files={filename: file},
headers=headers,
verify=CHECK_CERTIFICATE,
)
if r.status_code != 200:
raise APIError(f"erreur status={r.status_code} !", r.json())
else:
return r.json()
def check_failure_send(
justif_id: int,
headers,
filename: str = "/opt/scodoc/tests/api/test_api_justificatif.txt",
err: str = None,
):
try:
data = send_file(justif_id, filename, headers)
# ^ Renvoi un 404
except APIError as api_err:
if err is not None:
assert api_err.payload["message"] == err
else:
raise APIError("Le POST n'aurait pas du fonctionner")
def test_import_justificatif(api_headers):
# Bon fonctionnement
filename: str = "/opt/scodoc/tests/api/test_api_justificatif.txt"
resp: dict = send_file(1, filename, api_headers)
assert "response" in resp
assert resp["response"] == "imported"
filename: str = "/opt/scodoc/tests/api/test_api_justificatif2.txt"
resp: dict = send_file(1, filename, api_headers)
assert "response" in resp
assert resp["response"] == "imported"
# Mauvais fonctionnement
check_failure_send(FAUX, api_headers)
def test_list_justificatifs(api_headers):
# Bon fonctionnement
res: list = GET("/justificatif/list/1", api_headers)
assert isinstance(res, list)
assert len(res) == 2
res: list = GET("/justificatif/list/2", api_headers)
assert isinstance(res, list)
assert len(res) == 0
# Mauvais fonctionnement
check_failure_get(f"/justificatif/list/{FAUX}", api_headers)
def test_remove_justificatif(api_headers):
# Bon fonctionnement
filename: str = "/opt/scodoc/tests/api/test_api_justificatif.txt"
send_file(2, filename, api_headers)
filename: str = "/opt/scodoc/tests/api/test_api_justificatif2.txt"
send_file(2, filename, api_headers)
res: dict = POST_JSON("/justificatif/remove/1", {"remove": "all"}, api_headers)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/1", api_headers)) == 0
res: dict = POST_JSON(
"/justificatif/remove/2",
{"remove": "list", "filenames": ["test_api_justificatif2.txt"]},
api_headers,
)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/2", api_headers)) == 1
res: dict = POST_JSON(
"/justificatif/remove/2",
{"remove": "list", "filenames": ["test_api_justificatif.txt"]},
api_headers,
)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/2", api_headers)) == 0
# Mauvais fonctionnement
check_failure_post("/justificatif/remove/2", api_headers, {})
check_failure_post(f"/justificatif/remove/{FAUX}", api_headers, {"remove": "all"})