Fix: gestion des archives (confusion de départements)

This commit is contained in:
Emmanuel Viennet 2023-09-07 23:09:39 +02:00
parent 9187a12882
commit 32b57839c5
9 changed files with 193 additions and 142 deletions

View File

@ -8,8 +8,9 @@
from datetime import datetime
from flask_json import as_json
from flask import g, jsonify, request
from flask import g, request
from flask_login import login_required, current_user
from flask_sqlalchemy.query import Query
import app.scodoc.sco_assiduites as scass
import app.scodoc.sco_utils as scu
@ -26,7 +27,6 @@ from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import json_error
from flask_sqlalchemy.query import Query
# Partie Modèle
@ -436,7 +436,7 @@ def _delete_singular(justif_id: int, database):
if archive_name is not None:
archiver: JustificatifArchiver = JustificatifArchiver()
try:
archiver.delete_justificatif(justificatif_unique.etudid, archive_name)
archiver.delete_justificatif(justificatif_unique.etudiant, archive_name)
except ValueError:
pass
@ -481,7 +481,7 @@ def justif_import(justif_id: int = None):
try:
fname: str
archive_name, fname = archiver.save_justificatif(
etudid=justificatif_unique.etudid,
justificatif_unique.etudiant,
filename=file.filename,
data=file.stream.read(),
archive_name=archive_name,
@ -512,7 +512,7 @@ def justif_export(justif_id: int = None, filename: str = None):
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificaitf = query.first_or_404()
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
if archive_name is None:
@ -522,7 +522,7 @@ def justif_export(justif_id: int = None, filename: str = None):
try:
return archiver.get_justificatif_file(
archive_name, justificatif_unique.etudid, filename
archive_name, justificatif_unique.etudiant, filename
)
except ScoValueError as err:
return json_error(404, err.args[0])
@ -564,10 +564,10 @@ def justif_remove(justif_id: int = None):
if remove is None or remove not in ("all", "list"):
return json_error(404, "param 'remove': Valeur invalide")
archiver: JustificatifArchiver = JustificatifArchiver()
etudid: int = justificatif_unique.etudid
etud = justificatif_unique.etudiant
try:
if remove == "all":
archiver.delete_justificatif(etudid=etudid, archive_name=archive_name)
archiver.delete_justificatif(etud, archive_name=archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
@ -575,13 +575,13 @@ def justif_remove(justif_id: int = None):
else:
for fname in data.get("filenames", []):
archiver.delete_justificatif(
etudid=etudid,
etud,
archive_name=archive_name,
filename=fname,
)
if len(archiver.list_justificatifs(archive_name, etudid)) == 0:
archiver.delete_justificatif(etudid, archive_name)
if len(archiver.list_justificatifs(archive_name, etud)) == 0:
archiver.delete_justificatif(etud, archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
@ -616,16 +616,16 @@ def justif_list(justif_id: int = None):
archiver: JustificatifArchiver = JustificatifArchiver()
if archive_name is not None:
filenames = archiver.list_justificatifs(
archive_name, justificatif_unique.etudid
archive_name, justificatif_unique.etudiant
)
retour = {"total": len(filenames), "filenames": []}
for fi in filenames:
if int(fi[1]) == current_user.id or current_user.has_permission(
for filename in filenames:
if int(filename[1]) == current_user.id or current_user.has_permission(
Permission.ScoJustifView
):
retour["filenames"].append(fi[0])
retour["filenames"].append(filename[0])
return retour

View File

@ -81,7 +81,7 @@ from app.scodoc import sco_pv_pdf
from app.scodoc.sco_exceptions import ScoValueError
class BaseArchiver(object):
class BaseArchiver:
def __init__(self, archive_type=""):
self.archive_type = archive_type
self.initialized = False
@ -92,14 +92,17 @@ class BaseArchiver(object):
"set dept"
self.dept_id = dept_id
def initialize(self):
def initialize(self, dept_id: int = None):
"""Fixe le département et initialise les répertoires au besoin."""
# Set departement (à chaque fois car peut changer d'une utilisation à l'autre)
self.dept_id = getattr(g, "scodoc_dept_id") if dept_id is None else dept_id
if self.initialized:
return
dirs = [Config.SCODOC_VAR_DIR, "archives"]
if self.archive_type:
dirs.append(self.archive_type)
self.root = os.path.join(*dirs)
self.root = os.path.join(*dirs) # /opt/scodoc-data/archives/<type>
log("initialized archiver, path=" + self.root)
path = dirs[0]
for directory in dirs[1:]:
@ -112,15 +115,13 @@ class BaseArchiver(object):
finally:
scu.GSL.release()
self.initialized = True
if self.dept_id is None:
self.dept_id = getattr(g, "scodoc_dept_id")
def get_obj_dir(self, oid: int):
def get_obj_dir(self, oid: int, dept_id: int = None):
"""
:return: path to directory of archives for this object (eg formsemestre_id or etudid).
If directory does not yet exist, create it.
"""
self.initialize()
self.initialize(dept_id)
dept_dir = os.path.join(self.root, str(self.dept_id))
try:
scu.GSL.acquire()
@ -141,21 +142,21 @@ class BaseArchiver(object):
scu.GSL.release()
return obj_dir
def list_oids(self):
def list_oids(self, dept_id: int = None):
"""
:return: list of archive oids
"""
self.initialize()
self.initialize(dept_id)
base = os.path.join(self.root, str(self.dept_id)) + os.path.sep
dirs = glob.glob(base + "*")
return [os.path.split(x)[1] for x in dirs]
def list_obj_archives(self, oid: int):
def list_obj_archives(self, oid: int, dept_id: int = None):
"""Returns
:return: list of archive identifiers for this object (paths to non empty dirs)
"""
self.initialize()
base = self.get_obj_dir(oid) + os.path.sep
self.initialize(dept_id)
base = self.get_obj_dir(oid, dept_id=dept_id) + os.path.sep
dirs = glob.glob(
base
+ "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]"
@ -165,9 +166,9 @@ class BaseArchiver(object):
dirs.sort()
return dirs
def delete_archive(self, archive_id: str):
def delete_archive(self, archive_id: str, dept_id: int = None):
"""Delete (forever) this archive"""
self.initialize()
self.initialize(dept_id)
try:
scu.GSL.acquire()
shutil.rmtree(archive_id, ignore_errors=True)
@ -180,9 +181,9 @@ class BaseArchiver(object):
*[int(x) for x in os.path.split(archive_id)[1].split("-")]
)
def list_archive(self, archive_id: str) -> str:
def list_archive(self, archive_id: str, dept_id: int = None) -> str:
"""Return list of filenames (without path) in archive"""
self.initialize()
self.initialize(dept_id)
try:
scu.GSL.acquire()
files = os.listdir(archive_id)
@ -201,12 +202,12 @@ class BaseArchiver(object):
"^[0-9]{4}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$", archive_name
)
def get_id_from_name(self, oid, archive_name: str):
def get_id_from_name(self, oid, archive_name: str, dept_id: int = None):
"""returns archive id (check that name is valid)"""
self.initialize()
self.initialize(dept_id)
if not self.is_valid_archive_name(archive_name):
raise ScoValueError(f"Archive {archive_name} introuvable")
archive_id = os.path.join(self.get_obj_dir(oid), archive_name)
archive_id = os.path.join(self.get_obj_dir(oid, dept_id=dept_id), archive_name)
if not os.path.isdir(archive_id):
log(
f"invalid archive name: {archive_name}, oid={oid}, archive_id={archive_id}"
@ -214,9 +215,9 @@ class BaseArchiver(object):
raise ScoValueError(f"Archive {archive_name} introuvable")
return archive_id
def get_archive_description(self, archive_id: str) -> str:
def get_archive_description(self, archive_id: str, dept_id: int = None) -> str:
"""Return description of archive"""
self.initialize()
self.initialize(dept_id)
filename = os.path.join(archive_id, "_description.txt")
try:
with open(filename, encoding=scu.SCO_ENCODING) as f:
@ -229,11 +230,11 @@ class BaseArchiver(object):
return descr
def create_obj_archive(self, oid: int, description: str):
def create_obj_archive(self, oid: int, description: str, dept_id: int = None):
"""Creates a new archive for this object and returns its id."""
# id suffixé par YYYY-MM-DD-hh-mm-ss
archive_id = (
self.get_obj_dir(oid)
self.get_obj_dir(oid, dept_id=dept_id)
+ os.path.sep
+ "-".join([f"{x:02d}" for x in time.localtime()[:6]])
)
@ -248,7 +249,13 @@ class BaseArchiver(object):
self.store(archive_id, "_description.txt", description)
return archive_id
def store(self, archive_id: str, filename: str, data: Union[str, bytes]):
def store(
self,
archive_id: str,
filename: str,
data: Union[str, bytes],
dept_id: int = None,
):
"""Store data in archive, under given filename.
Filename may be modified (sanitized): return used filename
The file is created or replaced.
@ -256,7 +263,7 @@ class BaseArchiver(object):
"""
if isinstance(data, str):
data = data.encode(scu.SCO_ENCODING)
self.initialize()
self.initialize(dept_id)
filename = scu.sanitize_filename(filename)
log(f"storing {filename} ({len(data)} bytes) in {archive_id}")
try:
@ -268,9 +275,9 @@ class BaseArchiver(object):
scu.GSL.release()
return filename
def get(self, archive_id: str, filename: str):
def get(self, archive_id: str, filename: str, dept_id: int = None):
"""Retreive data"""
self.initialize()
self.initialize(dept_id)
if not scu.is_valid_filename(filename):
log(f"""Archiver.get: invalid filename '{filename}'""")
raise ScoValueError("archive introuvable (déjà supprimée ?)")
@ -280,11 +287,11 @@ class BaseArchiver(object):
data = f.read()
return data
def get_archived_file(self, oid, archive_name, filename):
def get_archived_file(self, oid, archive_name, filename, dept_id: int = None):
"""Recupère les donnees du fichier indiqué et envoie au client.
Returns: Response
"""
archive_id = self.get_id_from_name(oid, archive_name)
archive_id = self.get_id_from_name(oid, archive_name, dept_id=dept_id)
data = self.get(archive_id, filename)
mime = mimetypes.guess_type(filename)[0]
if mime is None:
@ -298,7 +305,7 @@ class SemsArchiver(BaseArchiver):
BaseArchiver.__init__(self, archive_type="")
PVArchive = SemsArchiver()
PV_ARCHIVER = SemsArchiver()
# ----------------------------------------------------------------------------
@ -332,8 +339,10 @@ def do_formsemestre_archive(
formsemestre = FormSemestre.get_formsemestre(formsemestre_id)
res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre)
sem_archive_id = formsemestre_id
archive_id = PVArchive.create_obj_archive(sem_archive_id, description)
date = PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M")
archive_id = PV_ARCHIVER.create_obj_archive(
sem_archive_id, description, formsemestre.dept_id
)
date = PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y à %H:%M")
if not group_ids:
# tous les inscrits du semestre
@ -347,7 +356,12 @@ def do_formsemestre_archive(
# Tableau recap notes en XLS (pour tous les etudiants, n'utilise pas les groupes)
data, _ = gen_formsemestre_recapcomplet_excel(res, include_evaluations=True)
if data:
PVArchive.store(archive_id, "Tableau_moyennes" + scu.XLSX_SUFFIX, data)
PV_ARCHIVER.store(
archive_id,
"Tableau_moyennes" + scu.XLSX_SUFFIX,
data,
dept_id=formsemestre.dept_id,
)
# Tableau recap notes en HTML (pour tous les etudiants, n'utilise pas les groupes)
table_html, _, _ = gen_formsemestre_recapcomplet_html_table(
formsemestre, res, include_evaluations=True
@ -367,13 +381,17 @@ def do_formsemestre_archive(
html_sco_header.sco_footer(),
]
)
PVArchive.store(archive_id, "Tableau_moyennes.html", data)
PV_ARCHIVER.store(
archive_id, "Tableau_moyennes.html", data, dept_id=formsemestre.dept_id
)
# Bulletins en JSON
data = gen_formsemestre_recapcomplet_json(formsemestre_id, xml_with_decisions=True)
data_js = json.dumps(data, indent=1, cls=ScoDocJSONEncoder)
if data:
PVArchive.store(archive_id, "Bulletins.json", data_js)
PV_ARCHIVER.store(
archive_id, "Bulletins.json", data_js, dept_id=formsemestre.dept_id
)
# Décisions de jury, en XLS
if formsemestre.formation.is_apc():
response = jury_but_pv.pvjury_page_but(formsemestre_id, fmt="xls")
@ -383,17 +401,23 @@ def do_formsemestre_archive(
formsemestre_id, format="xls", publish=False
)
if data:
PVArchive.store(
PV_ARCHIVER.store(
archive_id,
"Decisions_Jury" + scu.XLSX_SUFFIX,
data,
dept_id=formsemestre.dept_id,
)
# Classeur bulletins (PDF)
data, _ = sco_bulletins_pdf.get_formsemestre_bulletins_pdf(
formsemestre_id, version=bul_version
)
if data:
PVArchive.store(archive_id, "Bulletins.pdf", data)
PV_ARCHIVER.store(
archive_id,
"Bulletins.pdf",
data,
dept_id=formsemestre.dept_id,
)
# Lettres individuelles (PDF):
data = sco_pv_lettres_inviduelles.pdf_lettres_individuelles(
formsemestre_id,
@ -403,7 +427,12 @@ def do_formsemestre_archive(
signature=signature,
)
if data:
PVArchive.store(archive_id, f"CourriersDecisions{groups_filename}.pdf", data)
PV_ARCHIVER.store(
archive_id,
f"CourriersDecisions{groups_filename}.pdf",
data,
dept_id=formsemestre.dept_id,
)
# PV de jury (PDF):
data = sco_pv_pdf.pvjury_pdf(
@ -419,7 +448,12 @@ def do_formsemestre_archive(
anonymous=anonymous,
)
if data:
PVArchive.store(archive_id, f"PV_Jury{groups_filename}.pdf", data)
PV_ARCHIVER.store(
archive_id,
f"PV_Jury{groups_filename}.pdf",
data,
dept_id=formsemestre.dept_id,
)
def formsemestre_archive(formsemestre_id, group_ids: list[int] = None):
@ -558,14 +592,21 @@ enregistrés et non modifiables, on peut les retrouver ultérieurement.
def formsemestre_list_archives(formsemestre_id):
"""Page listing archives"""
formsemestre = FormSemestre.get_formsemestre(formsemestre_id)
sem_archive_id = formsemestre_id
L = []
for archive_id in PVArchive.list_obj_archives(sem_archive_id):
for archive_id in PV_ARCHIVER.list_obj_archives(
sem_archive_id, dept_id=formsemestre.dept_id
):
a = {
"archive_id": archive_id,
"description": PVArchive.get_archive_description(archive_id),
"date": PVArchive.get_archive_date(archive_id),
"content": PVArchive.list_archive(archive_id),
"description": PV_ARCHIVER.get_archive_description(
archive_id, dept_id=formsemestre.dept_id
),
"date": PV_ARCHIVER.get_archive_date(archive_id),
"content": PV_ARCHIVER.list_archive(
archive_id, dept_id=formsemestre.dept_id
),
}
L.append(a)
@ -575,7 +616,7 @@ def formsemestre_list_archives(formsemestre_id):
else:
H.append("<ul>")
for a in L:
archive_name = PVArchive.get_archive_name(a["archive_id"])
archive_name = PV_ARCHIVER.get_archive_name(a["archive_id"])
H.append(
'<li>%s : <em>%s</em> (<a href="formsemestre_delete_archive?formsemestre_id=%s&archive_name=%s">supprimer</a>)<ul>'
% (
@ -602,7 +643,9 @@ def formsemestre_get_archived_file(formsemestre_id, archive_name, filename):
"""Send file to client."""
formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id)
sem_archive_id = formsemestre.id
return PVArchive.get_archived_file(sem_archive_id, archive_name, filename)
return PV_ARCHIVER.get_archived_file(
sem_archive_id, archive_name, filename, dept_id=formsemestre.dept_id
)
def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=False):
@ -617,7 +660,9 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
)
)
sem_archive_id = formsemestre_id
archive_id = PVArchive.get_id_from_name(sem_archive_id, archive_name)
archive_id = PV_ARCHIVER.get_id_from_name(
sem_archive_id, archive_name, dept_id=formsemestre.dept_id
)
dest_url = url_for(
"notes.formsemestre_list_archives",
@ -628,7 +673,7 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
if not dialog_confirmed:
return scu.confirm_dialog(
f"""<h2>Confirmer la suppression de l'archive du {
PVArchive.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M")
PV_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M")
} ?</h2>
<p>La suppression sera définitive.</p>
""",
@ -640,6 +685,6 @@ def formsemestre_delete_archive(formsemestre_id, archive_name, dialog_confirmed=
},
)
PVArchive.delete_archive(archive_id)
PV_ARCHIVER.delete_archive(archive_id, dept_id=formsemestre.dept_id)
flash("Archive supprimée")
return flask.redirect(dest_url)

View File

@ -52,7 +52,8 @@ class EtudsArchiver(sco_archives.BaseArchiver):
sco_archives.BaseArchiver.__init__(self, archive_type="docetuds")
EtudsArchive = EtudsArchiver()
# Global au processus, attention !
ETUDS_ARCHIVER = EtudsArchiver()
def can_edit_etud_archive(authuser):
@ -60,21 +61,21 @@ def can_edit_etud_archive(authuser):
return authuser.has_permission(Permission.ScoEtudAddAnnotations)
def etud_list_archives_html(etudid):
def etud_list_archives_html(etud: Identite):
"""HTML snippet listing archives"""
can_edit = can_edit_etud_archive(current_user)
etuds = sco_etud.get_etud_info(etudid=etudid)
if not etuds:
raise ScoValueError("étudiant inexistant")
etud = etuds[0]
etud_archive_id = etudid
etud_archive_id = etud.id
L = []
for archive_id in EtudsArchive.list_obj_archives(etud_archive_id):
for archive_id in ETUDS_ARCHIVER.list_obj_archives(
etud_archive_id, dept_id=etud.dept_id
):
a = {
"archive_id": archive_id,
"description": EtudsArchive.get_archive_description(archive_id),
"date": EtudsArchive.get_archive_date(archive_id),
"content": EtudsArchive.list_archive(archive_id),
"description": ETUDS_ARCHIVER.get_archive_description(
archive_id, dept_id=etud.dept_id
),
"date": ETUDS_ARCHIVER.get_archive_date(archive_id),
"content": ETUDS_ARCHIVER.list_archive(archive_id, dept_id=etud.dept_id),
}
L.append(a)
delete_icon = scu.icontag(
@ -85,7 +86,7 @@ def etud_list_archives_html(etudid):
)
H = ['<div class="etudarchive"><ul>']
for a in L:
archive_name = EtudsArchive.get_archive_name(a["archive_id"])
archive_name = ETUDS_ARCHIVER.get_archive_name(a["archive_id"])
H.append(
"""<li><span class ="etudarchive_descr" title="%s">%s</span>"""
% (a["date"].strftime("%d/%m/%Y %H:%M"), a["description"])
@ -93,14 +94,14 @@ def etud_list_archives_html(etudid):
for filename in a["content"]:
H.append(
"""<a class="stdlink etudarchive_link" href="etud_get_archived_file?etudid=%s&archive_name=%s&filename=%s">%s</a>"""
% (etudid, archive_name, filename, filename)
% (etud.id, archive_name, filename, filename)
)
if not a["content"]:
H.append("<em>aucun fichier !</em>")
if can_edit:
H.append(
'<span class="deletudarchive"><a class="smallbutton" href="etud_delete_archive?etudid=%s&archive_name=%s">%s</a></span>'
% (etudid, archive_name, delete_icon)
% (etud.id, archive_name, delete_icon)
)
else:
H.append('<span class="deletudarchive">' + delete_disabled_icon + "</span>")
@ -108,7 +109,7 @@ def etud_list_archives_html(etudid):
if can_edit:
H.append(
'<li class="addetudarchive"><a class="stdlink" href="etud_upload_file_form?etudid=%s">ajouter un fichier</a></li>'
% etudid
% etud.id
)
H.append("</ul></div>")
return "".join(H)
@ -121,12 +122,13 @@ def add_archives_info_to_etud_list(etuds):
for etud in etuds:
l = []
etud_archive_id = etud["etudid"]
for archive_id in EtudsArchive.list_obj_archives(etud_archive_id):
# Here, ETUDS_ARCHIVER will use g.dept_id
for archive_id in ETUDS_ARCHIVER.list_obj_archives(etud_archive_id):
l.append(
"%s (%s)"
% (
EtudsArchive.get_archive_description(archive_id),
EtudsArchive.list_archive(archive_id)[0],
ETUDS_ARCHIVER.get_archive_description(archive_id),
ETUDS_ARCHIVER.list_archive(archive_id)[0],
)
)
etud["etudarchive"] = ", ".join(l)
@ -197,8 +199,8 @@ def _store_etud_file_to_new_archive(
filesize = len(data)
if filesize < 10 or filesize > scu.CONFIG.ETUD_MAX_FILE_SIZE:
return False, f"Fichier archive '{filename}' de taille invalide ! ({filesize})"
archive_id = EtudsArchive.create_obj_archive(etud_archive_id, description)
EtudsArchive.store(archive_id, filename, data)
archive_id = ETUDS_ARCHIVER.create_obj_archive(etud_archive_id, description)
ETUDS_ARCHIVER.store(archive_id, filename, data)
return True, "ok"
@ -212,14 +214,16 @@ def etud_delete_archive(etudid, archive_name, dialog_confirmed=False):
raise ScoValueError("étudiant inexistant")
etud = etuds[0]
etud_archive_id = etud["etudid"]
archive_id = EtudsArchive.get_id_from_name(etud_archive_id, archive_name)
archive_id = ETUDS_ARCHIVER.get_id_from_name(
etud_archive_id, archive_name, dept_id=etud["dept_id"]
)
if not dialog_confirmed:
return scu.confirm_dialog(
"""<h2>Confirmer la suppression des fichiers ?</h2>
<p>Fichier associé le %s à l'étudiant %s</p>
<p>La suppression sera définitive.</p>"""
% (
EtudsArchive.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M"),
ETUDS_ARCHIVER.get_archive_date(archive_id).strftime("%d/%m/%Y %H:%M"),
etud["nomprenom"],
),
dest_url="",
@ -232,7 +236,7 @@ def etud_delete_archive(etudid, archive_name, dialog_confirmed=False):
parameters={"etudid": etudid, "archive_name": archive_name},
)
EtudsArchive.delete_archive(archive_id)
ETUDS_ARCHIVER.delete_archive(archive_id, dept_id=etud["dept_id"])
flash("Archive supprimée")
return flask.redirect(
url_for("scolar.ficheEtud", scodoc_dept=g.scodoc_dept, etudid=etudid)
@ -246,7 +250,9 @@ def etud_get_archived_file(etudid, archive_name, filename):
raise ScoValueError("étudiant inexistant")
etud = etuds[0]
etud_archive_id = etud["etudid"]
return EtudsArchive.get_archived_file(etud_archive_id, archive_name, filename)
return ETUDS_ARCHIVER.get_archived_file(
etud_archive_id, archive_name, filename, dept_id=etud["dept_id"]
)
# --- Upload d'un ensemble de fichiers (pour un groupe d'étudiants)

View File

@ -102,7 +102,7 @@ class JustificatifArchiver(BaseArchiver):
def save_justificatif(
self,
etudid: int,
etud: Identite,
filename: str,
data: bytes or str,
archive_name: str = None,
@ -113,17 +113,18 @@ class JustificatifArchiver(BaseArchiver):
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
Retourne l'archive_name utilisé
"""
self._set_dept(etudid)
if archive_name is None:
archive_id: str = self.create_obj_archive(
oid=etudid, description=description
oid=etud.id, description=description, dept_id=etud.dept_id
)
else:
archive_id: str = self.get_id_from_name(etudid, archive_name)
archive_id: str = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
fname: str = self.store(archive_id, filename, data)
fname: str = self.store(archive_id, filename, data, dept_id=etud.dept_id)
trace = Trace(self.get_obj_dir(etudid))
trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(fname, mode="entry")
if user_id is not None:
trace.set_trace(fname, mode="user_id", current_user=user_id)
@ -132,7 +133,7 @@ class JustificatifArchiver(BaseArchiver):
def delete_justificatif(
self,
etudid: int,
etud: Identite,
archive_name: str,
filename: str = None,
has_trace: bool = True,
@ -140,92 +141,91 @@ class JustificatifArchiver(BaseArchiver):
"""
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s) dans la trace de l'étudiant
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s)
dans la trace de l'étudiant
"""
self._set_dept(etudid)
if str(etudid) not in self.list_oids():
raise ValueError(f"Aucune archive pour etudid[{etudid}]")
if str(etud.id) not in self.list_oids():
raise ValueError(f"Aucune archive pour etudid[{etud.id}]")
archive_id = self.get_id_from_name(etudid, archive_name)
archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
if filename is not None:
if filename not in self.list_archive(archive_id):
if filename not in self.list_archive(archive_id, dept_id=etud.dept_id):
raise ValueError(
f"filename {filename} inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]"
f"""filename {filename} inconnu dans l'archive archive_id[{
archive_id}] -> etudid[{etud.id}]"""
)
path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename)
path: str = os.path.join(
self.get_obj_dir(etud.id, dept_id=etud.dept_id), archive_id, filename
)
if os.path.isfile(path):
if has_trace:
trace = Trace(self.get_obj_dir(etudid))
trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(filename, mode="delete")
os.remove(path)
else:
if has_trace:
trace = Trace(self.get_obj_dir(etudid))
trace.set_trace(*self.list_archive(archive_id), mode="delete")
trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
trace.set_trace(
*self.list_archive(archive_id, dept_id=etud.dept_id), mode="delete"
)
self.delete_archive(
os.path.join(
self.get_obj_dir(etudid),
self.get_obj_dir(etud.id, dept_id=etud.dept_id),
archive_id,
)
)
def list_justificatifs(
self, archive_name: str, etudid: int
self, archive_name: str, etud: Identite
) -> list[tuple[str, int]]:
"""
Retourne la liste des noms de fichiers dans l'archive donnée
"""
self._set_dept(etudid)
filenames: list[str] = []
archive_id = self.get_id_from_name(etudid, archive_name)
archive_id = self.get_id_from_name(etud.id, archive_name, dept_id=etud.dept_id)
filenames = self.list_archive(archive_id)
trace: Trace = Trace(self.get_obj_dir(etudid))
filenames = self.list_archive(archive_id, dept_id=etud.dept_id)
trace: Trace = Trace(self.get_obj_dir(etud.id, dept_id=etud.dept_id))
traced = trace.get_trace(filenames)
retour = [(key, value[2]) for key, value in traced.items()]
return retour
def get_justificatif_file(self, archive_name: str, etudid: int, filename: str):
def get_justificatif_file(self, archive_name: str, etud: Identite, filename: str):
"""
Retourne une réponse de téléchargement de fichier si le fichier existe
"""
self._set_dept(etudid)
archive_id: str = self.get_id_from_name(etudid, archive_name)
if filename in self.list_archive(archive_id):
return self.get_archived_file(etudid, archive_name, filename)
archive_id: str = self.get_id_from_name(
etud.id, archive_name, dept_id=etud.dept_id
)
if filename in self.list_archive(archive_id, dept_id=etud.dept_id):
return self.get_archived_file(
etud.id, archive_name, filename, dept_id=etud.dept_id
)
raise ScoValueError(
f"Fichier {filename} introuvable dans l'archive {archive_name}"
)
def _set_dept(self, etudid: int):
"""
Mets à jour le dept_id de l'archiver en fonction du département de l'étudiant
"""
etud: Identite = Identite.query.filter_by(id=etudid).first()
self.set_dept_id(etud.dept_id)
def remove_dept_archive(self, dept_id: int = None):
"""
Supprime toutes les archives d'un département (ou de tous les départements)
Supprime aussi les fichiers de trace
"""
self.set_dept_id(1)
self.initialize()
# juste pour récupérer .root, dept_id n'a pas d'importance
self.initialize(dept_id=1)
if dept_id is None:
rmtree(self.root, ignore_errors=True)
else:
rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)
def get_trace(
self, etudid: int, *fnames: str
def get_trace( # XXX inutilisée ?
self, etud: Identite, *fnames: str
) -> dict[str, list[datetime, datetime]]:
"""Récupère la trace des justificatifs de l'étudiant"""
trace = Trace(self.get_obj_dir(etudid))
trace = Trace(self.get_obj_dir(etud.id, etud.dept_id))
return trace.get_trace(fnames)

View File

@ -85,7 +85,7 @@ class ApoCSVArchiver(sco_archives.BaseArchiver):
sco_archives.BaseArchiver.__init__(self, archive_type="apo_csv")
ApoCSVArchive = ApoCSVArchiver()
APO_CSV_ARCHIVER = ApoCSVArchiver()
# def get_sem_apo_archive(formsemestre_id):
@ -126,9 +126,9 @@ def apo_csv_store(csv_data: str, annee_scolaire, sem_id):
oid = f"{annee_scolaire}-{sem_id}"
description = f"""{str(apo_data.etape)};{annee_scolaire};{sem_id}"""
archive_id = ApoCSVArchive.create_obj_archive(oid, description)
archive_id = APO_CSV_ARCHIVER.create_obj_archive(oid, description)
csv_data_bytes = csv_data.encode(sco_apogee_reader.APO_OUTPUT_ENCODING)
ApoCSVArchive.store(archive_id, filename, csv_data_bytes)
APO_CSV_ARCHIVER.store(archive_id, filename, csv_data_bytes)
return apo_data.etape
@ -138,7 +138,7 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
:return: list of informations about stored CSV
[ { } ]
"""
oids = ApoCSVArchive.list_oids() # [ '2016-1', ... ]
oids = APO_CSV_ARCHIVER.list_oids() # [ '2016-1', ... ]
# filter
if annee_scolaire:
e = re.compile(str(annee_scolaire) + "-.+")
@ -149,9 +149,9 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
infos = [] # liste d'infos
for oid in oids:
archive_ids = ApoCSVArchive.list_obj_archives(oid)
archive_ids = APO_CSV_ARCHIVER.list_obj_archives(oid)
for archive_id in archive_ids:
description = ApoCSVArchive.get_archive_description(archive_id)
description = APO_CSV_ARCHIVER.get_archive_description(archive_id)
fs = tuple(description.split(";"))
if len(fs) == 3:
arch_etape_apo, arch_annee_scolaire, arch_sem_id = fs
@ -165,7 +165,7 @@ def apo_csv_list_stored_archives(annee_scolaire=None, sem_id=None, etapes=None):
"annee_scolaire": int(arch_annee_scolaire),
"sem_id": int(arch_sem_id),
"etape_apo": arch_etape_apo, # qui contient éventuellement le VDI
"date": ApoCSVArchive.get_archive_date(archive_id),
"date": APO_CSV_ARCHIVER.get_archive_date(archive_id),
}
)
infos.sort(key=lambda x: x["etape_apo"])
@ -185,7 +185,7 @@ def apo_csv_list_stored_etapes(annee_scolaire, sem_id=None, etapes=None):
def apo_csv_delete(archive_id):
"""Delete archived CSV"""
ApoCSVArchive.delete_archive(archive_id)
APO_CSV_ARCHIVER.delete_archive(archive_id)
def apo_csv_get_archive(etape_apo, annee_scolaire="", sem_id=""):
@ -209,7 +209,7 @@ def apo_csv_get(etape_apo="", annee_scolaire="", sem_id="") -> str:
"Etape %s non enregistree (%s, %s)" % (etape_apo, annee_scolaire, sem_id)
)
archive_id = info["archive_id"]
data = ApoCSVArchive.get(archive_id, etape_apo + ".csv")
data = APO_CSV_ARCHIVER.get(archive_id, etape_apo + ".csv")
# ce fichier a été archivé donc généré par ScoDoc
# son encodage est donc APO_OUTPUT_ENCODING
return data.decode(sco_apogee_reader.APO_OUTPUT_ENCODING)

View File

@ -448,7 +448,7 @@ def formsemestre_status_menubar(formsemestre: FormSemestre) -> str:
"title": "Documents archivés",
"endpoint": "notes.formsemestre_list_archives",
"args": {"formsemestre_id": formsemestre_id},
"enabled": sco_archives.PVArchive.list_obj_archives(formsemestre_id),
"enabled": sco_archives.PV_ARCHIVER.list_obj_archives(formsemestre_id),
},
]

View File

@ -441,7 +441,7 @@ def ficheEtud(etudid=None):
# Fichiers archivés:
info["fichiers_archive_htm"] = (
'<div class="fichetitre">Fichiers associés</div>'
+ sco_archives_etud.etud_list_archives_html(etudid)
+ sco_archives_etud.etud_list_archives_html(etud)
)
# Devenir de l'étudiant:

View File

@ -44,7 +44,7 @@ from app import models
from app.auth.models import User
from app.but import (
apc_edit_ue,
bulletin_but_court,
bulletin_but_court, # ne pas enlever: ajoute des routes !
cursus_but,
jury_edit_manual,
jury_but,

View File

@ -1,7 +1,7 @@
# -*- mode: python -*-
# -*- coding: utf-8 -*-
SCOVERSION = "9.6.24"
SCOVERSION = "9.6.25"
SCONAME = "ScoDoc"