This commit is contained in:
Sébastien Lehmann 2022-07-24 22:06:44 +02:00
commit 344490f14d
9 changed files with 209 additions and 42 deletions

View File

@ -28,6 +28,7 @@ from functools import wraps
from flask import g
from flask_httpauth import HTTPBasicAuth, HTTPTokenAuth
import flask_login
from flask_login import current_user
from app import log
@ -61,6 +62,7 @@ def verify_token(token) -> User:
"""
user = User.check_token(token) if token else None
flask_login.login_user(user)
g.current_user = user
return user

View File

@ -8,7 +8,9 @@
API : accès aux étudiants
"""
from flask import jsonify
from flask import g, jsonify
from flask_login import current_user
from sqlalchemy import or_
import app
from app.api import bp
@ -27,33 +29,68 @@ from app.scodoc.sco_permissions import Permission
@permission_required_api(Permission.ScoView, Permission.APIView)
def etudiants_courants(long=False):
"""
Liste des étudiants inscrits dans un formsemestre actuellement en cours.
La liste des étudiants des semestres "courants" (tous département)
(date du jour comprise dans la période couverte par le sem.)
dans lesquels l'utilisateur a le rôle APIView (donc tous si le dept du
rôle est None).
Exemple de résultat :
[
{
"id": 1,
"nip": 1,
"nom": "MOREL",
"prenom": "JACQUES",
"civilite": "X",
},
{
"id": 2,
"nip": 2,
"nom": "GILLES",
"prenom": "MAXIME",
"civilite": "X",
},
{
"id": 1234,
"nip": "12345678",
"ine": null,
"nom": "JOHN",
"nom_usuel": None,
"prenom": "DEUF",
"civilite": "M",
}
...
]
En format "long":
{
"boursier": True,
"civilite": "F",
"code_ine": "AP987654",
"code_nip": "1234567",
"codepostaldomicile": "92800",
"date_naissance": "21/06/2000",
"dept_acronym": "CJ",
"dept_id": 1,
"dept_naissance": "092",
"description": "infos portail",
"domicile": "Plaza Condell",
"email": "jeanne.dupont@xxx.fr",
"emailperso": "",
"etudid": 4853,
"id": 4863,
"lieu_naissance": "SEVRES",
"nationalite": "",
"nom": "DUPONT",
"nomprenom": "Mme Jeanne Dupont",
"paysdomicile": "FRANCE",
"prenom": "JEANNE",
"telephone": "0102030405",
"telephonemobile": "",
"typeadresse": "domicile",
"villedomicile": "VALPARAISO",
}
"""
allowed_depts = current_user.get_depts_with_permission(
Permission.APIView | Permission.ScoView
)
etuds = Identite.query.filter(
Identite.id == FormSemestreInscription.etudid,
FormSemestreInscription.formsemestre_id == FormSemestre.id,
FormSemestre.date_debut <= app.db.func.now(),
FormSemestre.date_fin >= app.db.func.now(),
)
if not None in allowed_depts:
# restreint aux départements autorisés:
etuds = etuds.join(Departement).filter(
or_(Departement.acronym == acronym for acronym in allowed_depts)
)
if long:
data = [etud.to_dict_bul(include_urls=False) for etud in etuds]
else:

View File

@ -7,12 +7,13 @@
"""
ScoDoc 9 API : partitions
"""
from flask import abort, jsonify, request
from flask import jsonify, request
import app
from app import db, log
from app.api import bp
from app.api.auth import permission_required_api
from app.api.errors import error_response
from app.models import FormSemestre, FormSemestreInscription, Identite
from app.models import GroupDescr, Partition
from app.models.groups import group_membership
@ -112,7 +113,7 @@ def etud_in_group_query(group_id: int):
"""Etudiants du groupe, filtrés par état"""
etat = request.args.get("etat")
if etat not in {scu.INSCRIT, scu.DEMISSION, scu.DEF}:
abort(404, "etat invalid")
return error_response(404, "etat: valeur invalide")
group = GroupDescr.query.get_or_404(group_id)
query = (
Identite.query.join(FormSemestreInscription)
@ -131,7 +132,7 @@ def set_etud_group(etudid: int, group_id: int):
etud = Identite.query.get_or_404(etudid)
group = GroupDescr.query.get_or_404(group_id)
if etud.id not in {e.id for e in group.partition.formsemestre.etuds}:
abort(404, "etud non inscrit au formsemestre du groupe")
return error_response(404, "etud non inscrit au formsemestre du groupe")
groups = (
GroupDescr.query.filter_by(partition_id=group.partition.id)
.join(group_membership)
@ -180,13 +181,13 @@ def group_create(partition_id: int):
"""
partition: Partition = Partition.query.get_or_404(partition_id)
if not partition.groups_editable:
abort(404, "partition non editable")
return error_response(404, "partition non editable")
data = request.get_json(force=True) # may raise 400 Bad Request
group_name = data.get("group_name")
if group_name is None:
abort(404, "missing group name or invalid data format")
return error_response(404, "missing group name or invalid data format")
if not GroupDescr.check_name(partition, group_name):
abort(404, "invalid group_name")
return error_response(404, "invalid group_name")
group_name = group_name.strip()
group = GroupDescr(group_name=group_name, partition_id=partition_id)
@ -204,7 +205,7 @@ def group_delete(group_id: int):
"""Suppression d'un groupe"""
group = GroupDescr.query.get_or_404(group_id)
if not group.partition.groups_editable:
abort(404, "partition non editable")
return error_response(404, "partition non editable")
formsemestre_id = group.partition.formsemestre_id
log(f"deleting {group}")
db.session.delete(group)
@ -220,12 +221,12 @@ def group_edit(group_id: int):
"""Edit a group"""
group: GroupDescr = GroupDescr.query.get_or_404(group_id)
if not group.partition.groups_editable:
abort(404, "partition non editable")
return error_response(404, "partition non editable")
data = request.get_json(force=True) # may raise 400 Bad Request
group_name = data.get("group_name")
if group_name is not None:
if not GroupDescr.check_name(group.partition, group_name, existing=True):
abort(404, "invalid group_name")
return error_response(404, "invalid group_name")
group.group_name = group_name.strip()
db.session.add(group)
db.session.commit()
@ -253,12 +254,12 @@ def partition_create(formsemestre_id: int):
data = request.get_json(force=True) # may raise 400 Bad Request
partition_name = data.get("partition_name")
if partition_name is None:
abort(404, "missing partition_name or invalid data format")
return error_response(404, "missing partition_name or invalid data format")
if not Partition.check_name(formsemestre, partition_name):
abort(404, "invalid partition_name")
return error_response(404, "invalid partition_name")
numero = data.get("numero", 0)
if not isinstance(numero, int):
abort(404, "invalid type for numero")
return error_response(404, "invalid type for numero")
args = {
"formsemestre_id": formsemestre_id,
"partition_name": partition_name.strip(),
@ -269,7 +270,7 @@ def partition_create(formsemestre_id: int):
boolean_field, False if boolean_field != "groups_editable" else True
)
if not isinstance(value, bool):
abort(404, f"invalid type for {boolean_field}")
return error_response(404, f"invalid type for {boolean_field}")
args[boolean_field] = value
partition = Partition(**args)
@ -281,6 +282,52 @@ def partition_create(formsemestre_id: int):
return jsonify(partition.to_dict(with_groups=True))
@bp.route("/formsemestre/<int:formsemestre_id>/partitions/order", methods=["POST"])
@permission_required_api(Permission.ScoEtudChangeGroups, Permission.APIEditGroups)
def formsemestre_order_partitions(formsemestre_id: int):
"""Modifie l'ordre des partitions du formsemestre
JSON args: [partition_id1, partition_id2, ...]
"""
formsemestre: FormSemestre = FormSemestre.query.get(formsemestre_id)
partition_ids = request.get_json(force=True) # may raise 400 Bad Request
if not isinstance(partition_ids, int) and not all(
isinstance(x, int) for x in partition_ids
):
return error_response(
404,
message="paramètre liste des partitions invalide",
)
for p_id, numero in zip(partition_ids, range(len(partition_ids))):
p = Partition.query.get_or_404(p_id)
p.numero = numero
db.session.add(p)
db.session.commit()
return jsonify(formsemestre.to_dict())
@bp.route("/partition/<int:partition_id>/groups/order", methods=["POST"])
@permission_required_api(Permission.ScoEtudChangeGroups, Permission.APIEditGroups)
def partition_order_groups(partition_id: int):
"""Modifie l'ordre des groupes de la partition
JSON args: [group_id1, group_id2, ...]
"""
partition = Partition.query.get_or_404(partition_id)
group_ids = request.get_json(force=True) # may raise 400 Bad Request
if not isinstance(group_ids, int) and not all(
isinstance(x, int) for x in group_ids
):
return error_response(
404,
message="paramètre liste de groupe invalide",
)
for group_id, numero in zip(group_ids, range(len(group_ids))):
group = GroupDescr.query.get_or_404(group_id)
group.numero = numero
db.session.add(group)
db.session.commit()
return jsonify(partition.to_dict(with_groups=True))
@bp.route("/partition/<int:partition_id>/edit", methods=["POST"])
@permission_required_api(Permission.ScoEtudChangeGroups, Permission.APIEditGroups)
def partition_edit(partition_id: int):
@ -304,14 +351,14 @@ def partition_edit(partition_id: int):
if not Partition.check_name(
partition.formsemestre, partition_name, existing=True
):
abort(404, "invalid partition_name")
return error_response(404, "invalid partition_name")
partition.partition_name = partition_name.strip()
modified = True
numero = data.get("numero")
if numero is not None and numero != partition.numero:
if not isinstance(numero, int):
abort(404, "invalid type for numero")
return error_response(404, "invalid type for numero")
partition.numero = numero
modified = True
@ -319,7 +366,7 @@ def partition_edit(partition_id: int):
value = data.get(boolean_field)
if value is not None and value != getattr(partition, boolean_field):
if not isinstance(value, bool):
abort(404, f"invalid type for {boolean_field}")
return error_response(404, f"invalid type for {boolean_field}")
setattr(partition, boolean_field, value)
modified = True
@ -345,7 +392,7 @@ def partition_delete(partition_id: int):
"""
partition = Partition.query.get_or_404(partition_id)
if not partition.partition_name:
abort(404, "ne peut pas supprimer la partition par défaut")
return error_response(404, "ne peut pas supprimer la partition par défaut")
is_parcours = partition.is_parcours()
formsemestre: FormSemestre = partition.formsemestre
log(f"deleting partition {partition}")

View File

@ -279,7 +279,7 @@ class User(UserMixin, db.Model):
return False
# Role management
def add_role(self, role, dept):
def add_role(self, role: "Role", dept: str):
"""Add a role to this user.
:param role: Role to add.
"""
@ -287,7 +287,7 @@ class User(UserMixin, db.Model):
raise ScoValueError("add_role: rôle invalide")
self.user_roles.append(UserRole(user=self, role=role, dept=dept))
def add_roles(self, roles, dept):
def add_roles(self, roles: "list[Role]", dept: str):
"""Add roles to this user.
:param roles: Roles to add.
"""
@ -315,6 +315,20 @@ class User(UserMixin, db.Model):
if r is not None
)
def get_depts_with_permission(self, permission: int) -> list[str]:
"""Liste des acronymes de département dans lesquels cet utilisateur
possède la permission indiquée.
L'"acronyme" None signifie "tous les départements".
Si plusieurs permissions (plusieurs bits) sont indiquées, c'est un "ou":
les départements dans lesquels l'utilisateur a l'une des permissions.
"""
return [
user_role.dept
for user_role in UserRole.query.filter_by(user=self)
.join(Role)
.filter(Role.permissions.op("&")(permission) != 0)
]
def is_administrator(self):
"True if i'm an active SuperAdmin"
return self.active and self.has_permission(Permission.ScoSuperAdmin, dept=None)
@ -410,6 +424,9 @@ class Role(db.Model):
w=Permission.NBITS,
)
def __str__(self):
return f"{self.name}: perm={', '.join(Permission.permissions_names(self.permissions))}"
def add_permission(self, perm):
self.permissions |= perm

View File

@ -90,7 +90,7 @@ class Partition(db.Model):
d.pop("formsemestre", None)
if with_groups:
groups = sorted(self.groups, key=lambda g: g.group_name)
groups = sorted(self.groups, key=lambda g: (g.numero or 0, g.group_name))
# un dict et non plus une liste, pour JSON
d["groups"] = {
group.id: group.to_dict(with_partition=False) for group in groups
@ -109,6 +109,8 @@ class GroupDescr(db.Model):
partition_id = db.Column(db.Integer, db.ForeignKey("partition.id"))
# "A", "C2", ... (NULL for 'all'):
group_name = db.Column(db.String(GROUPNAME_STR_LEN))
# Numero = ordre de presentation)
numero = db.Column(db.Integer)
etuds = db.relationship(
"Identite",
@ -131,9 +133,12 @@ class GroupDescr(db.Model):
"id": self.id,
"partition_id": self.partition_id,
"name": self.group_name,
"numero": self.numero,
}
if with_partition:
d["partition"] = self.partition.to_dict(with_groups=False)
d["partition"] = sorted(
self.partition, key=lambda p: p.numero or 0
).to_dict(with_groups=False)
return d
@classmethod

View File

@ -87,7 +87,7 @@ partitionEditor = ndb.EditableTable(
)
groupEditor = ndb.EditableTable(
"group_descr", "group_id", ("group_id", "partition_id", "group_name")
"group_descr", "group_id", ("group_id", "partition_id", "group_name", "numero")
)
group_list = groupEditor.list

View File

@ -57,12 +57,13 @@ _SCO_PERMISSIONS = (
)
class Permission(object):
class Permission:
"Permissions for ScoDoc"
NBITS = 1 # maximum bits used (for formatting)
ALL_PERMISSIONS = [-1]
description = {} # { symbol : blah blah }
permission_by_name = {} # { symbol : int }
permission_by_value = {} # { int : symbol }
@staticmethod
def init_permissions():
@ -70,6 +71,7 @@ class Permission(object):
setattr(Permission, symbol, perm)
Permission.description[symbol] = description
Permission.permission_by_name[symbol] = perm
Permission.permission_by_value[perm] = symbol
max_perm = max(p[0] for p in _SCO_PERMISSIONS)
Permission.NBITS = max_perm.bit_length()
@ -78,5 +80,23 @@ class Permission(object):
"""Return permission mode (integer bit field), or None if it doesn't exist."""
return Permission.permission_by_name.get(permission_name)
@staticmethod
def get_name(permission: int) -> str:
"""Return permission name, or None if it doesn't exist."""
return Permission.permission_by_value.get(permission)
@staticmethod
def permissions_names(permissions: int) -> list[str]:
"""From a bit field, return list of permission names"""
names = []
mask = 1 << (permissions.bit_length() - 1)
while mask > 0:
if mask & permissions:
name = Permission.get_name(mask)
if name is not None:
names.append(name)
mask = mask >> 1
return names
Permission.init_permissions()

View File

@ -227,6 +227,13 @@ def create_role(rolename, permissions): # create-role
db.session.commit()
@app.cli.command()
def list_roles(): # list-roles
"""List all defined roles"""
for role in Role.query:
print(role)
@app.cli.command()
@click.argument("rolename")
@click.option("-a", "--add", "addpermissionname")
@ -269,6 +276,7 @@ def edit_role(rolename, addpermissionname=None, removepermissionname=None): # e
if perm_to_add or perm_to_remove:
db.session.add(role)
db.session.commit()
print(role)
@app.cli.command()
@ -290,21 +298,38 @@ def delete_role(rolename):
@click.option("-r", "--remove", "remove_role_name")
def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None):
"""Add or remove a role to the given user in the given dept"""
user = User.query.filter_by(user_name=username).first()
user: User = User.query.filter_by(user_name=username).first()
if not user:
sys.stderr.write(f"user_role: user {username} does not exists\n")
return 1
# Sans argument, affiche les rôles de l'utilisateur
if dept_acronym is None and add_role_name is None and remove_role_name is None:
print(f"Roles for user {user.user_name}")
for user_role in sorted(
user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name)
):
print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""")
if dept_acronym:
dept = models.Departement.query.filter_by(acronym=dept_acronym).first()
if dept is None:
sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n")
sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n")
return 2
if add_role_name:
role = Role.query.filter_by(name=add_role_name).first()
if role is None:
sys.stderr.write(
f"""user_role: role {add_role_name} does not exists
(use list-roles to display existing roles)\n"""
)
return 2
user.add_role(role, dept_acronym)
if remove_role_name:
role = Role.query.filter_by(name=remove_role_name).first()
if role is None:
sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n")
return 2
user_role = UserRole.query.filter(
UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym
).first()
@ -355,7 +380,7 @@ def create_dept(dept): # create-dept
@app.cli.command()
@click.argument("depts", nargs=-1)
def list_depts(depts=""): # list-dept
def list_depts(depts=""): # list-depts
"""If dept exists, print it, else nothing.
Called without arguments, list all depts along with their ids.
"""

View File

@ -181,6 +181,20 @@ POST_JSON(
POST_JSON(f"/partition/{2379}/delete")
#
POST_JSON(
"/partition/2264/groups/order",
data=[5563, 5562, 5561, 5560, 5558, 5557, 5316, 5315],
)
POST_JSON(
"/formsemestre/1063/partitions/order",
data=[2264, 2263, 2265, 2266, 2267, 2372, 2378],
)
GET(f"/partition/2264")
# Recherche de formsemestres
sems = GET(f"/formsemestres/query?etape_apo=V1RT&annee_scolaire=2021")