ScoDoc/app/api/users.py
2023-07-11 09:48:06 +02:00

423 lines
14 KiB
Python

##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""
ScoDoc 9 API : accès aux utilisateurs
"""
from flask import g, request
from flask_json import as_json
from flask_login import current_user, login_required
from app import db
from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR
from app.scodoc.sco_utils import json_error
from app.auth.models import User, Role, UserRole
from app.auth.models import is_valid_password
from app.decorators import scodoc, permission_required
from app.models import Departement
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc import sco_utils as scu
@bp.route("/user/<int:uid>")
@api_web_bp.route("/user/<int:uid>")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
@as_json
def user_info(uid: int):
"""
Info sur un compte utilisateur scodoc
"""
user: User = db.session.get(User, uid)
if user is None:
return json_error(404, "user not found")
if g.scodoc_dept:
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersView)
if (None not in allowed_depts) and (user.dept not in allowed_depts):
return json_error(404, "user not found")
return user.to_dict()
@bp.route("/users/query")
@api_web_bp.route("/users/query")
@login_required
@scodoc
@permission_required(Permission.ScoView)
@as_json
def users_info_query():
"""Utilisateurs, filtrés par dept, active ou début nom
/users/query?departement=dept_acronym&active=1&starts_with=<string:nom>
Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés.
Si accès via API web, le département de l'URL est ignoré, seules
les permissions de l'utilisateur sont prises en compte.
"""
query = User.query
active = request.args.get("active")
if active is not None:
active = bool(str(active))
query = query.filter_by(active=active)
departement = request.args.get("departement")
if departement is not None:
query = query.filter_by(dept=departement or None)
starts_with = request.args.get("starts_with")
if starts_with is not None:
# remove % and _ for security
starts_with = starts_with.translate({ord(c): None for c in "%_"})
query = query.filter(User.nom.ilike(starts_with + "%"))
# Filtre selon permissions:
query = (
query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None))
.filter(UserRole.user == current_user)
.join(Role, UserRole.role_id == Role.id)
.filter(Role.permissions.op("&")(Permission.ScoUsersView) != 0)
)
query = query.order_by(User.user_name)
return [user.to_dict() for user in query]
@bp.route("/user/create", methods=["POST"])
@api_web_bp.route("/user/create", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoUsersAdmin)
@as_json
def user_create():
"""Création d'un utilisateur
The request content type should be "application/json":
{
"user_name": str,
"dept": str or null,
"nom": str,
"prenom": str,
"active":bool (default True)
}
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user_name = data.get("user_name")
if not user_name:
return json_error(404, "empty user_name")
user = User.query.filter_by(user_name=user_name).first()
if user:
return json_error(404, f"user_create: user {user} already exists\n")
dept = data.get("dept")
if dept == "@all":
dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_create: departement non autorise")
if (dept is not None) and (
Departement.query.filter_by(acronym=dept).first() is None
):
return json_error(404, "user_create: departement inexistant")
nom = data.get("nom")
prenom = data.get("prenom")
active = scu.to_bool(data.get("active", True))
user = User(user_name=user_name, active=active, dept=dept, nom=nom, prenom=prenom)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/edit", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/edit", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoUsersAdmin)
@as_json
def user_edit(uid: int):
"""Modification d'un utilisateur
Champs modifiables:
{
"dept": str or null,
"nom": str,
"prenom": str,
"active":bool
}
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user: User = User.query.get_or_404(uid)
# L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée
orig_dept = user.dept
dest_dept = data.get("dept", False)
if dest_dept is not False:
if dest_dept == "@all":
dest_dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin)
if (None not in allowed_depts) and (
(orig_dept not in allowed_depts) or (dest_dept not in allowed_depts)
):
return json_error(403, "user_edit: departement non autorise")
if dest_dept != orig_dept:
if (dest_dept is not None) and (
Departement.query.filter_by(acronym=dest_dept).first() is None
):
return json_error(404, "user_edit: departement inexistant")
user.dept = dest_dept
user.nom = data.get("nom", user.nom)
user.prenom = data.get("prenom", user.prenom)
user.active = scu.to_bool(data.get("active", user.active))
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/password", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/password", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoUsersAdmin)
@as_json
def user_password(uid: int):
"""Modification du mot de passe d'un utilisateur
Champs modifiables:
{
"password": str
}
Si le mot de passe ne convient pas, erreur 400.
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user: User = User.query.get_or_404(uid)
password = data.get("password")
if not password:
return json_error(404, "user_password: missing password")
if not is_valid_password(password):
return json_error(API_CLIENT_ERROR, "user_password: invalid password")
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin)
if (None not in allowed_depts) and ((user.dept not in allowed_depts)):
return json_error(403, "user_password: departement non autorise")
user.set_password(password)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def user_role_add(uid: int, role_name: str, dept: str = None):
"""Add a role to the user"""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_role_add: departement non autorise")
user.add_role(role, dept)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def user_role_remove(uid: int, role_name: str, dept: str = None):
"""Remove the role from the user"""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return json_error(403, "user_role_remove: departement non autorise")
query = UserRole.query.filter(UserRole.role == role, UserRole.user == user)
if dept is not None:
query = query.filter(UserRole.dept == dept)
user_role = query.first()
if user_role:
db.session.delete(user_role)
db.session.add(user)
db.session.commit()
return user.to_dict()
@bp.route("/permissions")
@api_web_bp.route("/permissions")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
@as_json
def list_permissions():
"""Liste des noms de permissions définies"""
return list(Permission.permission_by_name.keys())
@bp.route("/role/<string:role_name>")
@api_web_bp.route("/role/<string:role_name>")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
@as_json
def list_role(role_name: str):
"""Un rôle"""
return Role.query.filter_by(name=role_name).first_or_404().to_dict()
@bp.route("/roles")
@api_web_bp.route("/roles")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
@as_json
def list_roles():
"""Tous les rôles définis"""
return [role.to_dict() for role in Role.query]
@bp.route(
"/role/<string:role_name>/add_permission/<string:perm_name>",
methods=["POST"],
)
@api_web_bp.route(
"/role/<string:role_name>/add_permission/<string:perm_name>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_permission_add(role_name: str, perm_name: str):
"""Add permission to role"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
permission = Permission.get_by_name(perm_name)
if permission is None:
return json_error(404, "role_permission_add: permission inconnue")
role.add_permission(permission)
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route(
"/role/<string:role_name>/remove_permission/<string:perm_name>",
methods=["POST"],
)
@api_web_bp.route(
"/role/<string:role_name>/remove_permission/<string:perm_name>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_permission_remove(role_name: str, perm_name: str):
"""Remove permission from role"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
permission = Permission.get_by_name(perm_name)
if permission is None:
return json_error(404, "role_permission_remove: permission inconnue")
role.remove_permission(permission)
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route("/role/create/<string:role_name>", methods=["POST"])
@api_web_bp.route("/role/create/<string:role_name>", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_create(role_name: str):
"""Create a new role with permissions.
{
"permissions" : [ 'ScoView', ... ]
}
"""
role: Role = Role.query.filter_by(name=role_name).first()
if role:
return json_error(404, "role_create: role already exists")
role = Role(name=role_name)
data = request.get_json(force=True) # may raise 400 Bad Request
permissions = data.get("permissions")
if permissions:
try:
role.set_named_permissions(permissions)
except ScoValueError:
return json_error(404, "role_create: invalid permissions")
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route("/role/<string:role_name>/edit", methods=["POST"])
@api_web_bp.route("/role/<string:role_name>/edit", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_edit(role_name: str):
"""Edit a role. On peut spécifier un nom et/ou des permissions.
{
"name" : name
"permissions" : [ 'ScoView', ... ]
}
"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
data = request.get_json(force=True) # may raise 400 Bad Request
permissions = data.get("permissions", False)
if permissions is not False:
try:
role.set_named_permissions(permissions)
except ScoValueError:
return json_error(404, "role_create: invalid permissions")
role_name = data.get("role_name")
if role_name and role_name != role.name:
existing_role: Role = Role.query.filter_by(name=role_name).first()
if existing_role:
return json_error(404, "role_edit: role name already exists")
role.name = role_name
db.session.add(role)
db.session.commit()
return role.to_dict()
@bp.route("/role/<string:role_name>/delete", methods=["POST"])
@api_web_bp.route("/role/<string:role_name>/delete", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
@as_json
def role_delete(role_name: str):
"""Delete a role"""
role: Role = Role.query.filter_by(name=role_name).first_or_404()
db.session.delete(role)
db.session.commit()
return {"OK": True}