# -*- coding: UTF-8 -* """Gestion de l'assiduité (assiduités + justificatifs) """ from datetime import datetime from flask_sqlalchemy.query import Query from app import db, log, g, set_sco_dept from app.models import ModuleImpl, Module, Scolog, FormSemestre, FormSemestreInscription from app.models.etudiants import Identite from app.auth.models import User from app.scodoc import sco_abs_notification from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_utils import ( EtatAssiduite, EtatJustificatif, localize_datetime, is_assiduites_module_forced, ) class Assiduite(db.Model): """ Représente une assiduité: - une plage horaire lié à un état et un étudiant - un module si spécifiée - une description si spécifiée """ __tablename__ = "assiduites" id = db.Column(db.Integer, primary_key=True, nullable=False) assiduite_id = db.synonym("id") date_debut = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) date_fin = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) moduleimpl_id = db.Column( db.Integer, db.ForeignKey("notes_moduleimpl.id", ondelete="SET NULL"), ) etudid = db.Column( db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"), index=True, nullable=False, ) etat = db.Column(db.Integer, nullable=False) description = db.Column(db.Text) entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now()) user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="SET NULL"), nullable=True, ) est_just = db.Column(db.Boolean, server_default="false", nullable=False) external_data = db.Column(db.JSON, nullable=True) # Déclare la relation "joined" car on va très souvent vouloir récupérer # l'étudiant en même tant que l'assiduité (perf.: évite nouvelle requete SQL) etudiant = db.relationship("Identite", back_populates="assiduites", lazy="joined") # En revanche, user est rarement accédé: user = db.relationship( "User", backref=db.backref( "assiduites", lazy="select", order_by="Assiduite.entry_date" ), lazy="select", ) def to_dict(self, format_api=True) -> dict: """Retourne la représentation json de l'assiduité""" etat = self.etat user: User | None = None if format_api: # format api utilise les noms "present,absent,retard" au lieu des int etat = EtatAssiduite.inverse().get(self.etat).name if self.user_id is not None: user = db.session.get(User, self.user_id) data = { "assiduite_id": self.id, "etudid": self.etudid, "code_nip": self.etudiant.code_nip, "moduleimpl_id": self.moduleimpl_id, "date_debut": self.date_debut, "date_fin": self.date_fin, "etat": etat, "desc": self.description, "entry_date": self.entry_date, "user_id": None if user is None else user.id, # l'uid "user_name": None if user is None else user.user_name, # le login "user_nom_complet": None if user is None else user.get_nomcomplet(), # "Marie Dupont" "est_just": self.est_just, "external_data": self.external_data, } return data def __str__(self) -> str: "chaine pour journaux et debug (lisible par humain français)" try: etat_str = EtatAssiduite(self.etat).name.lower().capitalize() except ValueError: etat_str = "Invalide" return f"""{etat_str} { "just." if self.est_just else "non just." } de { self.date_debut.strftime("%d/%m/%Y %Hh%M") } à { self.date_fin.strftime("%d/%m/%Y %Hh%M") }""" @classmethod def create_assiduite( cls, etud: Identite, date_debut: datetime, date_fin: datetime, etat: EtatAssiduite, moduleimpl: ModuleImpl = None, description: str = None, entry_date: datetime = None, user_id: int = None, est_just: bool = False, external_data: dict = None, notify_mail=False, ) -> "Assiduite": """Créer une nouvelle assiduité pour l'étudiant. Les datetime doivent être en timzone serveur. Raises ScoValueError en cas de conflit ou erreur. """ if date_debut.tzinfo is None: log( f"Warning: create_assiduite: date_debut without timezone ({date_debut})" ) if date_fin.tzinfo is None: log(f"Warning: create_assiduite: date_fin without timezone ({date_fin})") # Vérification de non duplication des périodes assiduites: Query = etud.assiduites if is_period_conflicting(date_debut, date_fin, assiduites, Assiduite): log( f"create_assiduite: period_conflicting etudid={etud.id} date_debut={date_debut} date_fin={date_fin}" ) raise ScoValueError( "Duplication: la période rentre en conflit avec une plage enregistrée" ) if not est_just: est_just = ( len( get_justifs_from_date(etud.etudid, date_debut, date_fin, valid=True) ) > 0 ) moduleimpl_id = None if moduleimpl is not None: # Vérification de l'inscription de l'étudiant if moduleimpl.est_inscrit(etud): moduleimpl_id = moduleimpl.id else: raise ScoValueError("L'étudiant n'est pas inscrit au module") elif not ( external_data is not None and external_data.get("module") is not None ): # Vérification si module forcé formsemestre: FormSemestre = get_formsemestre_from_data( {"etudid": etud.id, "date_debut": date_debut, "date_fin": date_fin} ) force: bool if formsemestre: force = is_assiduites_module_forced(formsemestre_id=formsemestre.id) else: force = is_assiduites_module_forced(dept_id=etud.dept_id) if force: raise ScoValueError("Module non renseigné") nouv_assiduite = Assiduite( date_debut=date_debut, date_fin=date_fin, description=description, entry_date=entry_date, est_just=est_just, etat=etat, etudiant=etud, external_data=external_data, moduleimpl_id=moduleimpl_id, user_id=user_id, ) db.session.add(nouv_assiduite) db.session.flush() log(f"create_assiduite: {etud.id} id={nouv_assiduite.id} {nouv_assiduite}") Scolog.logdb( method="create_assiduite", etudid=etud.id, msg=f"assiduité: {nouv_assiduite}", ) if notify_mail and etat == EtatAssiduite.ABSENT: sco_abs_notification.abs_notify(etud.id, nouv_assiduite.date_debut) return nouv_assiduite def set_moduleimpl(self, moduleimpl_id: int | str) -> bool: """TODO""" # je ne comprend pas cette fonction WIP # moduleimpl_id peut être == "autre", ce qui plante # ci-dessous un fix temporaire en attendant explication de @iziram try: moduleimpl_id_int = int(moduleimpl_id) except ValueError as exc: raise ScoValueError("invalid moduleimpl_id") from exc # /fix moduleimpl: ModuleImpl = ModuleImpl.query.get(moduleimpl_id_int) if moduleimpl is not None: # Vérification de l'inscription de l'étudiant if moduleimpl.est_inscrit(self.etudiant): self.moduleimpl_id = moduleimpl.id else: raise ScoValueError("L'étudiant n'est pas inscrit au module") elif isinstance(moduleimpl_id, str): if self.external_data is None: self.external_data = {"module": moduleimpl_id} else: self.external_data["module"] = moduleimpl_id self.moduleimpl_id = None else: # Vérification si module forcé formsemestre: FormSemestre = get_formsemestre_from_data( { "etudid": self.etudid, "date_debut": self.date_debut, "date_fin": self.date_fin, } ) force: bool if formsemestre: force = is_assiduites_module_forced(formsemestre_id=formsemestre.id) else: force = is_assiduites_module_forced(dept_id=self.etudiant.dept_id) if force: raise ScoValueError("Module non renseigné") return True def supprime(self): "Supprime l'assiduité. Log et commit." from app.scodoc import sco_assiduites as scass if g.scodoc_dept is None and self.etudiant.dept_id is not None: # route sans département set_sco_dept(self.etudiant.departement.acronym) obj_dict: dict = self.to_dict() # Suppression de l'objet et LOG log(f"delete_assidutite: {self.etudiant.id} {self}") Scolog.logdb( method="delete_assiduite", etudid=self.etudiant.id, msg=f"Assiduité: {self}", ) db.session.delete(self) db.session.commit() # Invalidation du cache scass.simple_invalidate_cache(obj_dict) def get_formsemestre(self) -> FormSemestre: """Le formsemestre associé. Attention: en cas d'inscription multiple prend arbitrairement l'un des semestres. A utiliser avec précaution ! """ return get_formsemestre_from_data(self.to_dict()) def get_module(self, traduire: bool = False) -> int | str: "TODO" if self.moduleimpl_id is not None: if traduire: modimpl: ModuleImpl = ModuleImpl.query.get(self.moduleimpl_id) mod: Module = Module.query.get(modimpl.module_id) return f"{mod.code} {mod.titre}" elif self.external_data is not None and "module" in self.external_data: return ( "Tout module" if self.external_data["module"] == "Autre" else self.external_data["module"] ) return "Non spécifié" if traduire else None class Justificatif(db.Model): """ Représente un justificatif: - une plage horaire lié à un état et un étudiant - une raison si spécifiée - un fichier si spécifié """ __tablename__ = "justificatifs" id = db.Column(db.Integer, primary_key=True) justif_id = db.synonym("id") date_debut = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) date_fin = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) etudid = db.Column( db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"), index=True, nullable=False, ) etat = db.Column( db.Integer, nullable=False, ) entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now()) "date de création de l'élément: date de saisie" # pourrait devenir date de dépot au secrétariat, si différente user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="SET NULL"), nullable=True, index=True, ) raison = db.Column(db.Text()) # Archive_id -> sco_archives_justificatifs.py fichier = db.Column(db.Text()) # Déclare la relation "joined" car on va très souvent vouloir récupérer # l'étudiant en même tant que le justificatif (perf.: évite nouvelle requete SQL) etudiant = db.relationship( "Identite", back_populates="justificatifs", lazy="joined" ) external_data = db.Column(db.JSON, nullable=True) def to_dict(self, format_api: bool = False) -> dict: """transformation de l'objet en dictionnaire sérialisable""" etat = self.etat username = self.user_id if format_api: etat = EtatJustificatif.inverse().get(self.etat).name if self.user_id is not None: user: User = db.session.get(User, self.user_id) if user is None: username = "Non renseigné" else: username = user.get_prenomnom() data = { "justif_id": self.justif_id, "etudid": self.etudid, "code_nip": self.etudiant.code_nip, "date_debut": self.date_debut, "date_fin": self.date_fin, "etat": etat, "raison": self.raison, "fichier": self.fichier, "entry_date": self.entry_date, "user_id": username, "external_data": self.external_data, } return data def __repr__(self) -> str: "chaine pour journaux et debug (lisible par humain français)" try: etat_str = EtatJustificatif(self.etat).name except ValueError: etat_str = "Invalide" return f"""Justificatif id={self.id} {etat_str} de { self.date_debut.strftime("%d/%m/%Y %Hh%M") } à { self.date_fin.strftime("%d/%m/%Y %Hh%M") }""" @classmethod def create_justificatif( cls, etud: Identite, date_debut: datetime, date_fin: datetime, etat: EtatJustificatif, raison: str = None, entry_date: datetime = None, user_id: int = None, external_data: dict = None, ) -> "Justificatif": """Créer un nouveau justificatif pour l'étudiant""" nouv_justificatif = Justificatif( date_debut=date_debut, date_fin=date_fin, etat=etat, etudiant=etud, raison=raison, entry_date=entry_date, user_id=user_id, external_data=external_data, ) db.session.add(nouv_justificatif) log(f"create_justificatif: etudid={etud.id} {nouv_justificatif}") Scolog.logdb( method="create_justificatif", etudid=etud.id, msg=f"justificatif: {nouv_justificatif}", ) return nouv_justificatif def supprime(self): "Supprime le justificatif. Log et commit." from app.scodoc import sco_assiduites as scass from app.scodoc.sco_archives_justificatifs import JustificatifArchiver # Récupération de l'archive du justificatif archive_name: str = self.fichier if archive_name is not None: # Si elle existe : on essaye de la supprimer archiver: JustificatifArchiver = JustificatifArchiver() try: archiver.delete_justificatif(self.etudiant, archive_name) except ValueError: pass if g.scodoc_dept is None and self.etudiant.dept_id is not None: # route sans département set_sco_dept(self.etudiant.departement.acronym) # On invalide le cache scass.simple_invalidate_cache(self.to_dict()) # Suppression de l'objet et LOG log(f"delete_justificatif: {self.etudiant.id} {self}") Scolog.logdb( method="delete_justificatif", etudid=self.etudiant.id, msg=f"Justificatif: {self}", ) db.session.delete(self) db.session.commit() # On actualise les assiduités justifiées de l'étudiant concerné compute_assiduites_justified( self.etudid, Justificatif.query.filter_by(etudid=self.etudid).all(), True, ) def is_period_conflicting( date_debut: datetime, date_fin: datetime, collection: Query, collection_cls: Assiduite | Justificatif, ) -> bool: """ Vérifie si une date n'entre pas en collision avec les justificatifs ou assiduites déjà présentes """ # On s'assure que les dates soient avec TimeZone date_debut = localize_datetime(date_debut) date_fin = localize_datetime(date_fin) count: int = collection.filter( collection_cls.date_debut < date_fin, collection_cls.date_fin > date_debut ).count() return count > 0 def compute_assiduites_justified( etudid: int, justificatifs: list[Justificatif] = None, reset: bool = False ) -> list[int]: """ Args: etudid (int): l'identifiant de l'étudiant justificatifs (list[Justificatif]): La liste des justificatifs qui seront utilisés reset (bool, optional): remet à false les assiduites non justifiés. Defaults to False. Returns: list[int]: la liste des assiduités qui ont été justifiées. """ # Si on ne donne pas de justificatifs on prendra par défaut tous les justificatifs de l'étudiant if justificatifs is None: justificatifs: list[Justificatif] = Justificatif.query.filter_by( etudid=etudid ).all() # On ne prend que les justificatifs valides justificatifs = [j for j in justificatifs if j.etat == EtatJustificatif.VALIDE] # On récupère les assiduités de l'étudiant assiduites: Assiduite = Assiduite.query.filter_by(etudid=etudid) assiduites_justifiees: list[int] = [] for assi in assiduites: # On ne justifie pas les Présences if assi.etat == EtatAssiduite.PRESENT: continue # On récupère les justificatifs qui justifient l'assiduité `assi` assi_justificatifs = Justificatif.query.filter( Justificatif.etudid == assi.etudid, Justificatif.date_debut <= assi.date_debut, Justificatif.date_fin >= assi.date_fin, Justificatif.etat == EtatJustificatif.VALIDE, ).all() # Si au moins un justificatif possède une période qui couvre l'assiduité if any( assi.date_debut >= j.date_debut and assi.date_fin <= j.date_fin for j in justificatifs + assi_justificatifs ): # On justifie l'assiduité # On ajoute l'id de l'assiduité à la liste des assiduités justifiées assi.est_just = True assiduites_justifiees.append(assi.assiduite_id) db.session.add(assi) elif reset: # Si le paramètre reset est Vrai alors les assiduités non justifiées # sont remise en "non justifiée" assi.est_just = False db.session.add(assi) # On valide la session db.session.commit() # On renvoie la liste des assiduite_id des assiduités justifiées return assiduites_justifiees def get_assiduites_justif(assiduite_id: int, long: bool) -> list[int | dict]: """ get_assiduites_justif Récupération des justificatifs d'une assiduité Args: assiduite_id (int): l'identifiant de l'assiduité long (bool): Retourner des dictionnaires à la place des identifiants des justificatifs Returns: list[int | dict]: La liste des justificatifs (par défaut uniquement les identifiants, sinon les dict si long est vrai) """ assi: Assiduite = Assiduite.query.get_or_404(assiduite_id) return get_justifs_from_date(assi.etudid, assi.date_debut, assi.date_fin, long) def get_justifs_from_date( etudid: int, date_debut: datetime, date_fin: datetime, long: bool = False, valid: bool = False, ) -> list[int | dict]: """ get_justifs_from_date Récupération des justificatifs couvrant une période pour un étudiant donné Args: etudid (int): l'identifiant de l'étudiant date_debut (datetime): la date de début (datetime avec timezone) date_fin (datetime): la date de fin (datetime avec timezone) long (bool, optional): Définition de la sortie. Vrai pour avoir les dictionnaires des justificatifs. Faux pour avoir uniquement les identifiants Defaults to False. valid (bool, optional): Filtre pour n'avoir que les justificatifs valide. Si vrai : le retour ne contiendra que des justificatifs valides Sinon le retour contiendra tout type de justificatifs Defaults to False. Returns: list[int | dict]: La liste des justificatifs (par défaut uniquement les identifiants, sinon les dict si long est vrai) """ # On récupère les justificatifs d'un étudiant couvrant la période donnée justifs: Query = Justificatif.query.filter( Justificatif.etudid == etudid, Justificatif.date_debut <= date_debut, Justificatif.date_fin >= date_fin, ) # si valide est vrai alors on filtre pour n'avoir que les justificatifs valide if valid: justifs = justifs.filter(Justificatif.etat == EtatJustificatif.VALIDE) # On renvoie la liste des id des justificatifs si long est Faux, # sinon on renvoie les dicts des justificatifs if long: return [j.to_dict(True) for j in justifs] return [j.justif_id for j in justifs] def get_formsemestre_from_data(data: dict[str, datetime | int]) -> FormSemestre: """ get_formsemestre_from_data récupère un formsemestre en fonction des données passées Si l'étudiant est inscrit à plusieurs formsemestre, prend le premier. Args: data (dict[str, datetime | int]): Une représentation simplifiée d'une assiduité ou d'un justificatif data = { "etudid" : int, "date_debut": datetime (tz), "date_fin": datetime (tz), } Returns: FormSemestre: Le formsemestre trouvé ou None """ return ( FormSemestre.query.join( FormSemestreInscription, FormSemestre.id == FormSemestreInscription.formsemestre_id, ) .filter( data["date_debut"] <= FormSemestre.date_fin, data["date_fin"] >= FormSemestre.date_debut, FormSemestreInscription.etudid == data["etudid"], ) .first() )