API: utilisateurs /user, /users/query + tests unitaires

This commit is contained in:
Emmanuel Viennet 2022-08-05 17:05:24 +02:00
parent 1a096b53bc
commit dae762c3b1
10 changed files with 245 additions and 51 deletions

View File

@ -28,10 +28,11 @@ from app.api import (
billets_absences, billets_absences,
departements, departements,
etudiants, etudiants,
evaluations,
formations, formations,
formsemestres, formsemestres,
jury,
logos, logos,
partitions, partitions,
evaluations, users,
jury,
) )

View File

@ -1,6 +1,18 @@
############################################### Departements ########################################################## ##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""
ScoDoc 9 API : accès aux départements
Note: les routes /departement[s] sont publiées sur l'API (/ScoDoc/api/),
mais évidemment pas sur l'API web (/ScoDoc/<dept>/api).
"""
from flask import jsonify, request from flask import jsonify, request
from flask_login import login_required
import app import app
from app import db, log from app import db, log
@ -24,7 +36,8 @@ def get_departement(dept_ident: str) -> Departement:
return Departement.query.get_or_404(dept_id) return Departement.query.get_or_404(dept_id)
@bp.route("/departements", methods=["GET"]) @bp.route("/departements")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def departements_list(): def departements_list():
@ -32,7 +45,8 @@ def departements_list():
return jsonify([dept.to_dict() for dept in Departement.query]) return jsonify([dept.to_dict() for dept in Departement.query])
@bp.route("/departements_ids", methods=["GET"]) @bp.route("/departements_ids")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def departements_ids(): def departements_ids():
@ -40,7 +54,8 @@ def departements_ids():
return jsonify([dept.id for dept in Departement.query]) return jsonify([dept.id for dept in Departement.query])
@bp.route("/departement/<string:acronym>", methods=["GET"]) @bp.route("/departement/<string:acronym>")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def departement(acronym: str): def departement(acronym: str):
@ -60,7 +75,8 @@ def departement(acronym: str):
return jsonify(dept.to_dict()) return jsonify(dept.to_dict())
@bp.route("/departement/id/<int:dept_id>", methods=["GET"]) @bp.route("/departement/id/<int:dept_id>")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def departement_by_id(dept_id: int): def departement_by_id(dept_id: int):
@ -72,6 +88,7 @@ def departement_by_id(dept_id: int):
@bp.route("/departement/create", methods=["POST"]) @bp.route("/departement/create", methods=["POST"])
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoSuperAdmin) @permission_required(Permission.ScoSuperAdmin)
def departement_create(): def departement_create():
@ -96,6 +113,7 @@ def departement_create():
@bp.route("/departement/<string:acronym>/edit", methods=["POST"]) @bp.route("/departement/<string:acronym>/edit", methods=["POST"])
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoSuperAdmin) @permission_required(Permission.ScoSuperAdmin)
def departement_edit(acronym): def departement_edit(acronym):
@ -119,6 +137,7 @@ def departement_edit(acronym):
@bp.route("/departement/<string:acronym>/delete", methods=["POST"]) @bp.route("/departement/<string:acronym>/delete", methods=["POST"])
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoSuperAdmin) @permission_required(Permission.ScoSuperAdmin)
def departement_delete(acronym): def departement_delete(acronym):
@ -132,6 +151,7 @@ def departement_delete(acronym):
@bp.route("/departement/<string:acronym>/etudiants", methods=["GET"]) @bp.route("/departement/<string:acronym>/etudiants", methods=["GET"])
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_etudiants(acronym: str): def dept_etudiants(acronym: str):
@ -160,7 +180,8 @@ def dept_etudiants(acronym: str):
return jsonify([etud.to_dict_short() for etud in dept.etudiants]) return jsonify([etud.to_dict_short() for etud in dept.etudiants])
@bp.route("/departement/id/<int:dept_id>/etudiants", methods=["GET"]) @bp.route("/departement/id/<int:dept_id>/etudiants")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_etudiants_by_id(dept_id: int): def dept_etudiants_by_id(dept_id: int):
@ -171,7 +192,8 @@ def dept_etudiants_by_id(dept_id: int):
return jsonify([etud.to_dict_short() for etud in dept.etudiants]) return jsonify([etud.to_dict_short() for etud in dept.etudiants])
@bp.route("/departement/<string:acronym>/formsemestres_ids", methods=["GET"]) @bp.route("/departement/<string:acronym>/formsemestres_ids")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_formsemestres_ids(acronym: str): def dept_formsemestres_ids(acronym: str):
@ -180,7 +202,8 @@ def dept_formsemestres_ids(acronym: str):
return jsonify([formsemestre.id for formsemestre in dept.formsemestres]) return jsonify([formsemestre.id for formsemestre in dept.formsemestres])
@bp.route("/departement/id/<int:dept_id>/formsemestres_ids", methods=["GET"]) @bp.route("/departement/id/<int:dept_id>/formsemestres_ids")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_formsemestres_ids_by_id(dept_id: int): def dept_formsemestres_ids_by_id(dept_id: int):
@ -189,7 +212,8 @@ def dept_formsemestres_ids_by_id(dept_id: int):
return jsonify([formsemestre.id for formsemestre in dept.formsemestres]) return jsonify([formsemestre.id for formsemestre in dept.formsemestres])
@bp.route("/departement/<string:acronym>/formsemestres_courants", methods=["GET"]) @bp.route("/departement/<string:acronym>/formsemestres_courants")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_formsemestres_courants(acronym: str): def dept_formsemestres_courants(acronym: str):
@ -243,7 +267,8 @@ def dept_formsemestres_courants(acronym: str):
return jsonify([d.to_dict(convert_objects=True) for d in formsemestres]) return jsonify([d.to_dict(convert_objects=True) for d in formsemestres])
@bp.route("/departement/id/<int:dept_id>/formsemestres_courants", methods=["GET"]) @bp.route("/departement/id/<int:dept_id>/formsemestres_courants")
@login_required
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
def dept_formsemestres_courants_by_id(dept_id: int): def dept_formsemestres_courants_by_id(dept_id: int):

81
app/api/users.py Normal file
View File

@ -0,0 +1,81 @@
##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""
ScoDoc 9 API : accès aux utilisateurs
"""
from flask import g, jsonify, request
from flask_login import current_user, login_required
import app
from app import db, log
from app.api import api_bp as bp, api_web_bp
from app.api.errors import error_response
from app.auth.models import User, Role, UserRole
from app.decorators import scodoc, permission_required
from app.models import Departement
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
@bp.route("/user/<int:uid>")
@api_web_bp.route("/user/<int:uid>")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
def user_info(uid: int):
"""
Info sur un compte utilisateur scodoc
"""
user: User = User.query.get(uid)
if user is None:
return error_response(404, "user not found")
if g.scodoc_dept:
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersView)
if user.dept not in allowed_depts:
return error_response(404, "user not found")
return jsonify(user.to_dict())
@bp.route("/users/query")
@api_web_bp.route("/users/query")
@login_required
@scodoc
@permission_required(Permission.ScoView)
def users_info_query():
"""Utilisateurs, filtrés par dept, active ou début nom
/users/query?departement=dept_acronym&active=1&starts_with=<str:nom>
Si accès via API web, seuls les utilisateurs "accessibles" (selon les
permissions) sont retournés: le département de l'URL est ignoré, seules
les permissions de l'utilisateur sont prises en compte.
"""
query = User.query
active = request.args.get("active")
if active is not None:
active = bool(str(active))
query = query.filter_by(active=active)
departement = request.args.get("departement")
if departement is not None:
query = query.filter_by(dept=departement or None)
starts_with = request.args.get("starts_with")
if starts_with is not None:
# remove % and _ for security
starts_with = starts_with.translate({ord(c): None for c in "%_"})
query = query.filter(User.nom.ilike(starts_with + "%"))
# Filtre selon permissions:
query = (
query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None))
.filter(UserRole.user == current_user)
.join(Role, UserRole.role_id == Role.id)
.filter(Role.permissions.op("&")(Permission.ScoUsersView) != 0)
)
query = query.order_by(User.user_name)
return jsonify([u.to_dict() for u in query])

View File

@ -174,18 +174,18 @@ class User(UserMixin, db.Model):
data = { data = {
"date_expiration": self.date_expiration.isoformat() + "Z" "date_expiration": self.date_expiration.isoformat() + "Z"
if self.date_expiration if self.date_expiration
else "", else None,
"date_modif_passwd": self.date_modif_passwd.isoformat() + "Z" "date_modif_passwd": self.date_modif_passwd.isoformat() + "Z"
if self.date_modif_passwd if self.date_modif_passwd
else "", else None,
"date_created": self.date_created.isoformat() + "Z" "date_created": self.date_created.isoformat() + "Z"
if self.date_created if self.date_created
else "", else None,
"dept": (self.dept or ""), # sco8 "dept": self.dept,
"id": self.id, "id": self.id,
"active": self.active, "active": self.active,
"status_txt": "actif" if self.active else "fermé", "status_txt": "actif" if self.active else "fermé",
"last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else "", "last_seen": self.last_seen.isoformat() + "Z" if self.last_seen else None,
"nom": (self.nom or ""), # sco8 "nom": (self.nom or ""), # sco8
"prenom": (self.prenom or ""), # sco8 "prenom": (self.prenom or ""), # sco8
"roles_string": self.get_roles_string(), # eg "Ens_RT, Ens_Info" "roles_string": self.get_roles_string(), # eg "Ens_RT, Ens_Info"

View File

@ -378,12 +378,12 @@ def search_inscr_etud_by_nip(code_nip, format="json"):
T = [] T = []
for etuds in result: for etuds in result:
if etuds: if etuds:
DeptId = etuds[0]["dept"] dept_id = etuds[0]["dept"]
for e in etuds: for e in etuds:
for sem in e["sems"]: for sem in e["sems"]:
T.append( T.append(
{ {
"dept": DeptId, "dept": dept_id,
"etudid": e["etudid"], "etudid": e["etudid"],
"code_nip": e["code_nip"], "code_nip": e["code_nip"],
"civilite_str": e["civilite_str"], "civilite_str": e["civilite_str"],

View File

@ -321,7 +321,7 @@ def check_modif_user(
# check département # check département
if ( if (
enforce_optionals enforce_optionals
and dept != "" and dept
and Departement.query.filter_by(acronym=dept).first() is None and Departement.query.filter_by(acronym=dept).first() is None
): ):
return False, "département '%s' inexistant" % dept + MSG_OPT return False, "département '%s' inexistant" % dept + MSG_OPT

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Test Logos """Test API: départements
Utilisation : Utilisation :
créer les variables d'environnement: (indiquer les valeurs créer les variables d'environnement: (indiquer les valeurs

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Test Logos """Test API : groupes et partitions
Utilisation : Utilisation :
créer les variables d'environnement: (indiquer les valeurs créer les variables d'environnement: (indiquer les valeurs

View File

@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
"""Test API : utilisateurs
Utilisation :
pytest tests/api/test_api_users.py
"""
from tests.api.setup_test_api import (
API_URL,
CHECK_CERTIFICATE,
GET,
POST_JSON,
api_headers,
api_admin_headers,
get_auth_headers,
)
def test_list_users(api_admin_headers):
"""
Routes: /user/<int:uid>
/users/query?departement=dept_acronym&active=1&like=<str:nom>
"""
admin_h = api_admin_headers
depts = GET("/departements", headers=admin_h)
assert len(depts) > 0
u = GET("/user/1", headers=admin_h)
assert u["id"] == 1
assert u["user_name"]
assert u["date_expiration"] is None
dept_u = u["dept"]
# Tous les utilisateurs, vus par SuperAdmin:
users = GET("/users/query", headers=admin_h)
# Les utilisateurs de chaque département (+ ceux sans département)
all_users = []
for acronym in [dept["acronym"] for dept in depts] + [""]:
all_users += GET(f"/users/query?departement={acronym}", headers=admin_h)
all_users.sort(key=lambda u: u["user_name"])
assert len(all_users) == len(users)
# On a créé un user "u_" par département:
u_users = GET("/users/query?starts_with=U ", headers=admin_h)
assert len(u_users) == len(depts)
assert len(GET("/users/query?departement=AA", headers=admin_h)) == 1
assert len(GET("/users/query?departement=AA&starts_with=U ", headers=admin_h)) == 1
assert (
len(
GET(
"/users/query?departement=AA&starts_with=XXX",
headers=admin_h,
)
)
== 0
)
# Utilisateurs vus par d'autres utilisateurs (droits accès)
for i, u in enumerate(u for u in u_users if u["dept"] != "TAPI"):
headers = get_auth_headers(u["user_name"], "test")
users_by_u = GET("/users/query", headers=headers)
assert len(users_by_u) == 4 + i
# explication: tous ont le droit de voir les 3 users de TAPI
# (test, other et u_TAPI)
# plus l'utilisateur de chaque département jusqu'au leur
# (u_AA voit AA, u_BB voit AA et BB, etc)

View File

@ -19,7 +19,6 @@ from app import models
from app.models import departements from app.models import departements
from app.models import ( from app.models import (
Absence, Absence,
ApcReferentielCompetences,
Departement, Departement,
Formation, Formation,
FormSemestre, FormSemestre,
@ -47,13 +46,9 @@ REFCOMP_FILENAME = (
) )
def init_departement(acronym: str) -> Departement: def create_departements(acronyms: list[str]) -> list[Departement]:
"Create dept, and switch context into it." "Create depts"
import app as mapp return [departements.create_dept(acronym) for acronym in acronyms]
dept = departements.create_dept(acronym)
mapp.set_sco_dept(acronym)
return dept
def import_formation(dept_id: int) -> Formation: def import_formation(dept_id: int) -> Formation:
@ -78,28 +73,33 @@ def import_formation(dept_id: int) -> Formation:
return formation return formation
def create_users(dept: Departement) -> tuple: def create_users(depts: list[Departement]) -> tuple:
"""Crée les utilisateurs nécessaires aux tests""" """Crée les roles et utilisateurs nécessaires aux tests"""
# Un utilisateur "test" (passwd test) pouvant lire l'API dept = depts[0]
user = User(user_name="test", nom="Doe", prenom="John", dept=dept.acronym) # Le rôle standard LecteurAPI existe déjà: lui donne les permissions
user.set_password("test") # ScoView, ScoAbsAddBillet, ScoEtudChangeGroups
db.session.add(user) role_lecteur = Role.query.filter_by(name="LecteurAPI").first()
if role_lecteur is None:
# Le rôle standard LecteurAPI existe déjà
role = Role.query.filter_by(name="LecteurAPI").first()
if role is None:
print("Erreur: rôle LecteurAPI non existant") print("Erreur: rôle LecteurAPI non existant")
sys.exit(1) sys.exit(1)
perm_sco_view = Permission.get_by_name("ScoView") perm_sco_view = Permission.get_by_name("ScoView")
role.add_permission(perm_sco_view) role_lecteur.add_permission(perm_sco_view)
# Edition billets # Edition billets
perm_billets = Permission.get_by_name("ScoAbsAddBillet") perm_billets = Permission.get_by_name("ScoAbsAddBillet")
role.add_permission(perm_billets) role_lecteur.add_permission(perm_billets)
perm_groups = Permission.get_by_name("ScoEtudChangeGroups") perm_groups = Permission.get_by_name("ScoEtudChangeGroups")
role.add_permission(perm_groups) role_lecteur.add_permission(perm_groups)
db.session.add(role) db.session.add(role_lecteur)
user.add_role(role, None) # Un role pour juste voir les utilisateurs
role_users_viewer = Role(name="UsersViewer", permissions=Permission.ScoUsersView)
db.session.add(role_users_viewer)
# Un utilisateur "test" (passwd test) pouvant lire l'API
user_test = User(user_name="test", nom="Doe", prenom="John", dept=dept.acronym)
user_test.set_password("test")
db.session.add(user_test)
user_test.add_role(role_lecteur, None)
# Un utilisateur "other" n'ayant aucune permission sur l'API # Un utilisateur "other" n'ayant aucune permission sur l'API
other = User(user_name="other", nom="Sans", prenom="Permission", dept=dept.acronym) other = User(user_name="other", nom="Sans", prenom="Permission", dept=dept.acronym)
@ -110,14 +110,32 @@ def create_users(dept: Departement) -> tuple:
admin_api = User(user_name="admin_api", nom="Admin", prenom="API") admin_api = User(user_name="admin_api", nom="Admin", prenom="API")
admin_api.set_password("admin_api") admin_api.set_password("admin_api")
db.session.add(admin_api) db.session.add(admin_api)
role = Role.query.filter_by(name="SuperAdmin").first() role_super_admin = Role.query.filter_by(name="SuperAdmin").first()
if role is None: if role_super_admin is None:
print("Erreur: rôle SuperAdmin non existant") print("Erreur: rôle SuperAdmin non existant")
sys.exit(1) sys.exit(1)
admin_api.add_role(role, None) admin_api.add_role(role_super_admin, None)
# Des utilisateurs voyant certains utilisateurs...
users = []
for dept in depts:
u = User(
user_name=f"u_{dept.acronym}",
nom=f"U {dept.acronym}",
prenom="lambda",
dept=dept.acronym,
)
u.set_password("test")
users.append(u)
db.session.add(u)
# Roles pour tester les fonctions sur les utilisateurs
for i, u in enumerate(users):
for dept in depts[: i + 1]:
u.add_role(role_users_viewer, dept.acronym)
u.add_role(role_lecteur, None) # necessaire pour avoir le jeton
db.session.commit() db.session.commit()
return user, other return user_test, other
def create_fake_etud(dept: Departement) -> Identite: def create_fake_etud(dept: Departement) -> Identite:
@ -343,8 +361,12 @@ def init_test_database():
Création d'un département et de son contenu pour les tests Création d'un département et de son contenu pour les tests
""" """
dept = init_departement("TAPI") import app as mapp
user_lecteur, user_autre = create_users(dept)
depts = create_departements(["TAPI", "AA", "BB", "CC", "DD"])
dept = depts[0]
mapp.set_sco_dept(dept.acronym)
user_lecteur, user_autre = create_users(depts)
with sco_cache.DeferredSemCacheManager(): with sco_cache.DeferredSemCacheManager():
etuds = create_etuds(dept) etuds = create_etuds(dept)
formation = import_formation(dept.id) formation = import_formation(dept.id)