ScoDoc/app/models/groups.py

364 lines
14 KiB
Python
Raw Permalink Normal View History

# -*- coding: UTF-8 -*
2022-07-20 15:07:31 +02:00
##############################################################################
# ScoDoc
2023-12-31 23:04:06 +01:00
# Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved.
2022-07-20 15:07:31 +02:00
# See LICENSE
##############################################################################
2022-07-20 15:07:31 +02:00
"""ScoDoc models: Groups & partitions
"""
from operator import attrgetter
from sqlalchemy.exc import IntegrityError
from app import db, log
2023-12-05 21:04:38 +01:00
from app.models import ScoDocModel, GROUPNAME_STR_LEN, SHORT_STR_LEN
2023-09-20 19:09:02 +02:00
from app.models.etudiants import Identite
2023-12-05 21:04:38 +01:00
from app.models.events import Scolog
from app.scodoc import sco_cache
2022-07-20 15:07:31 +02:00
from app.scodoc import sco_utils as scu
from app.scodoc.sco_exceptions import AccessDenied, ScoValueError
2023-12-22 15:21:07 +01:00
class Partition(ScoDocModel):
"""Partition: découpage d'une promotion en groupes"""
__table_args__ = (db.UniqueConstraint("formsemestre_id", "partition_name"),)
id = db.Column(db.Integer, primary_key=True)
partition_id = db.synonym("id")
formsemestre_id = db.Column(
db.Integer,
db.ForeignKey("notes_formsemestre.id"),
index=True,
)
# "TD", "TP", ... (NULL for 'all')
partition_name = db.Column(db.String(SHORT_STR_LEN))
# Numero = ordre de presentation)
numero = db.Column(db.Integer, nullable=False, default=0)
# Calculer le rang ?
2021-08-10 00:23:30 +02:00
bul_show_rank = db.Column(
db.Boolean(), nullable=False, default=False, server_default="false"
)
# Montrer quand on indique les groupes de l'étudiant ?
2021-08-10 00:23:30 +02:00
show_in_lists = db.Column(
db.Boolean(), nullable=False, default=True, server_default="true"
)
# Editable (créer/renommer groupes) ? (faux pour les groupes de parcours)
groups_editable = db.Column(
db.Boolean(), nullable=False, default=True, server_default="true"
)
groups = db.relationship(
"GroupDescr",
backref=db.backref("partition", lazy=True),
lazy="dynamic",
2022-07-20 15:07:31 +02:00
cascade="all, delete-orphan",
2023-09-20 19:09:02 +02:00
order_by="GroupDescr.numero, GroupDescr.group_name",
)
def __init__(self, **kwargs):
super(Partition, self).__init__(**kwargs)
if self.numero is None:
# génère numero à la création
last_partition = Partition.query.order_by(Partition.numero.desc()).first()
if last_partition:
self.numero = last_partition.numero + 1
else:
self.numero = 1
def __repr__(self):
return f"""<{self.__class__.__name__} {self.id} "{self.partition_name or '(default)'}">"""
2022-07-20 15:07:31 +02:00
@classmethod
def check_name(
cls, formsemestre: "FormSemestre", partition_name: str, existing=False
) -> bool:
"""check if a partition named 'partition_name' can be created in the given formsemestre.
If existing is True, allow a partition_name already existing in the formsemestre.
"""
if not isinstance(partition_name, str):
return False
if not (0 < len(partition_name.strip()) < SHORT_STR_LEN):
2022-07-20 15:07:31 +02:00
return False
if (not existing) and (
partition_name in [p.partition_name for p in formsemestre.partitions]
):
return False
return True
@classmethod
def formsemestre_remove_etud(cls, formsemestre_id: int, etud: "Identite"):
"retire l'étudiant de toutes les partitions de ce semestre"
for group in GroupDescr.query.join(Partition).filter_by(
formsemestre_id=formsemestre_id
):
group.remove_etud(etud)
2022-07-20 15:07:31 +02:00
def is_parcours(self) -> bool:
"Vrai s'il s'agit de la partition de parcours"
2022-07-20 15:07:31 +02:00
return self.partition_name == scu.PARTITION_PARCOURS
def to_dict(self, with_groups=False, str_keys: bool = False) -> dict:
"""as a dict, with or without groups.
If str_keys, convert integer dict keys to strings (useful for JSON)
"""
d = dict(self.__dict__)
2023-01-22 22:15:56 +01:00
d["partition_id"] = self.id
d.pop("_sa_instance_state", None)
2022-07-20 22:33:41 +02:00
d.pop("formsemestre", None)
if with_groups:
groups = sorted(self.groups, key=attrgetter("numero", "group_name"))
# un dict et non plus une liste, pour JSON
if str_keys:
d["groups"] = {
str(group.id): group.to_dict(with_partition=False)
for group in groups
}
else:
d["groups"] = {
group.id: group.to_dict(with_partition=False) for group in groups
}
return d
def get_etud_group(self, etudid: int) -> "GroupDescr":
"Le groupe de l'étudiant dans cette partition, ou None si pas présent"
return (
GroupDescr.query.filter_by(partition_id=self.id)
.join(group_membership)
.filter_by(etudid=etudid)
.first()
)
def set_etud_group(self, etud: "Identite", group: "GroupDescr") -> bool:
"""Affect etudid to group_id in given partition.
Raises IntegrityError si conflit,
or ValueError si ce group_id n'est pas dans cette partition
ou que l'étudiant n'est pas inscrit au semestre.
2023-07-08 13:57:44 +02:00
Return True si changement, False s'il était déjà dans ce groupe.
"""
if not group.id in (g.id for g in self.groups):
raise ScoValueError(
f"""Le groupe {group.id} n'est pas dans la partition {
self.partition_name or "tous"}"""
)
if etud.id not in (e.id for e in self.formsemestre.etuds):
raise ScoValueError(
f"""étudiant {etud.nomprenom} non inscrit au formsemestre du groupe {
group.group_name}"""
)
try:
existing_row = (
db.session.query(group_membership)
.filter_by(etudid=etud.id)
.join(GroupDescr)
.filter_by(partition_id=self.id)
.first()
)
if existing_row:
existing_group_id = existing_row[1]
2023-07-08 13:57:44 +02:00
if group.id == existing_group_id:
return False
# Fait le changement avec l'ORM sinon risque élevé de blocage
existing_group = db.session.get(GroupDescr, existing_group_id)
db.session.commit()
group.etuds.append(etud)
existing_group.etuds.remove(etud)
db.session.add(etud)
db.session.add(existing_group)
db.session.add(group)
else:
new_row = group_membership.insert().values(
etudid=etud.id, group_id=group.id
)
db.session.execute(new_row)
db.session.commit()
except IntegrityError:
db.session.rollback()
raise
2023-07-08 13:57:44 +02:00
return True
def create_group(self, group_name="", default=False) -> "GroupDescr":
"Crée un groupe dans cette partition"
if not self.formsemestre.can_change_groups():
raise AccessDenied(
"""Vous n'avez pas le droit d'effectuer cette opération,
ou bien le semestre est verrouillé !"""
)
if group_name:
group_name = group_name.strip()
if not group_name and not default:
raise ValueError("invalid group name: ()")
if not GroupDescr.check_name(self, group_name, default=default):
raise ScoValueError(
f"Le groupe {group_name} existe déjà dans cette partition"
)
numeros = [g.numero if g.numero is not None else 0 for g in self.groups]
if len(numeros) > 0:
new_numero = max(numeros) + 1
else:
new_numero = 0
group = GroupDescr(partition=self, group_name=group_name, numero=new_numero)
db.session.add(group)
db.session.commit()
log(f"create_group: created group_id={group.id}")
#
return group
2023-12-22 15:21:07 +01:00
class GroupDescr(ScoDocModel):
"""Description d'un groupe d'une partition"""
__tablename__ = "group_descr"
__table_args__ = (db.UniqueConstraint("partition_id", "group_name"),)
id = db.Column(db.Integer, primary_key=True)
group_id = db.synonym("id")
partition_id = db.Column(db.Integer, db.ForeignKey("partition.id"))
group_name = db.Column(db.String(GROUPNAME_STR_LEN))
"""nom du groupe: "A", "C2", ... (NULL for 'all')"""
edt_id: str | None = db.Column(db.Text(), index=True, nullable=True)
"identifiant emplois du temps (unicité non imposée)"
numero = db.Column(db.Integer, nullable=False, default=0)
"Numero = ordre de presentation"
etuds = db.relationship(
"Identite",
secondary="group_membership",
lazy="dynamic",
)
def __repr__(self):
return (
f"""<{self.__class__.__name__} {self.id} "{self.group_name or '(tous)'}">"""
)
def get_nom_with_part(self, default="-") -> str:
"""Nom avec partition: 'TD A'
Si groupe par défaut (tous), utilise default ou "-"
"""
if self.partition.partition_name is None:
return default
return f"{self.partition.partition_name or ''} {self.group_name or '-'}"
def to_dict(self, with_partition=True) -> dict:
"""as a dict, with or without partition"""
2023-12-29 02:48:23 +01:00
if with_partition:
partition_dict = self.partition.to_dict(with_groups=False)
2022-08-02 09:48:11 +02:00
d = dict(self.__dict__)
d.pop("_sa_instance_state", None)
if with_partition:
2023-12-29 02:48:23 +01:00
d["partition"] = partition_dict
return d
def get_edt_ids(self) -> list[str]:
"les ids normalisés pour l'emploi du temps: à défaut, le nom scodoc du groupe"
return [
scu.normalize_edt_id(x)
for x in scu.split_id(self.edt_id) or [self.group_name] or []
]
2023-09-20 19:09:02 +02:00
def get_nb_inscrits(self) -> int:
"""Nombre inscrits à ce group et au formsemestre.
C'est nécessaire car lors d'une désinscription, on conserve l'appartenance
aux groupes pour faciliter une éventuelle -inscription.
2023-09-20 19:09:02 +02:00
"""
from app.models.formsemestre import FormSemestreInscription
return (
Identite.query.join(group_membership)
.filter_by(group_id=self.id)
.join(FormSemestreInscription)
.filter_by(formsemestre_id=self.partition.formsemestre.id)
.count()
)
2022-07-20 09:50:02 +02:00
@classmethod
def check_name(
2022-09-03 10:07:34 +02:00
cls, partition: "Partition", group_name: str, existing=False, default=False
2022-07-20 09:50:02 +02:00
) -> bool:
"""check if a group named 'group_name' can be created in the given partition.
If existing is True, allow a group_name already existing in the partition.
2022-09-03 10:07:34 +02:00
If default, name must be empty and default group must not yet exists.
2022-07-20 09:50:02 +02:00
"""
if not isinstance(group_name, str):
return False
if not default and not (0 < len(group_name.strip()) < GROUPNAME_STR_LEN):
2022-07-20 09:50:02 +02:00
return False
if (not existing) and (group_name in [g.group_name for g in partition.groups]):
return False
return True
def set_name(self, group_name: str, dest_url: str = None):
"""Set group name, and optionally edt_id.
Check permission (partition must be groups_editable)
and invalidate caches. Commit session.
dest_url is used for error messages.
"""
if not self.partition.formsemestre.can_change_groups():
raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !")
if self.group_name is None:
raise ValueError("can't set a name to default group")
if not self.partition.groups_editable:
raise AccessDenied("Partition non éditable")
if group_name:
group_name = group_name.strip()
if not group_name:
raise ScoValueError("nom de groupe vide !", dest_url=dest_url)
if group_name != self.group_name and not GroupDescr.check_name(
self.partition, group_name
):
raise ScoValueError(
"Le nom de groupe existe déjà dans la partition", dest_url=dest_url
)
self.group_name = group_name
db.session.add(self)
db.session.commit()
sco_cache.invalidate_formsemestre(
formsemestre_id=self.partition.formsemestre_id
)
def set_edt_id(self, edt_id: str):
"Set edt_id. Check permission. Commit session."
if not self.partition.formsemestre.can_change_groups():
raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !")
if isinstance(edt_id, str):
edt_id = edt_id.strip() or None
self.edt_id = edt_id
db.session.add(self)
db.session.commit()
def remove_etud(self, etud: "Identite"):
"Enlève l'étudiant de ce groupe s'il en fait partie (ne fait rien sinon)"
if etud in self.etuds:
self.etuds.remove(etud)
db.session.commit()
Scolog.logdb(
method="group_remove_etud",
etudid=etud.id,
msg=f"Retrait du groupe {self.group_name} de {self.partition.partition_name}",
commit=True,
)
# Update parcours
if self.partition.partition_name == scu.PARTITION_PARCOURS:
self.partition.formsemestre.update_inscriptions_parcours_from_groups(
etudid=etud.id
)
sco_cache.invalidate_formsemestre(self.partition.formsemestre_id)
group_membership = db.Table(
"group_membership",
db.Column("etudid", db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE")),
db.Column("group_id", db.Integer, db.ForeignKey("group_descr.id")),
db.UniqueConstraint("etudid", "group_id"),
)
# class GroupMembership(db.Model):
# """Association groupe / étudiant"""
# __tablename__ = "group_membership"
# __table_args__ = (db.UniqueConstraint("etudid", "group_id"),)
# id = db.Column(db.Integer, primary_key=True)
# etudid = db.Column(db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"))
# group_id = db.Column(db.Integer, db.ForeignKey("group_descr.id"))