ScoDoc-Lille/app/api/users.py

251 lines
8.7 KiB
Python

##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""
ScoDoc 9 API : accès aux utilisateurs
"""
from flask import g, jsonify, request
from flask_login import current_user, login_required
import app
from app import db, log
from app.api import api_bp as bp, api_web_bp
from app.api.errors import error_response
from app.auth.models import User, Role, UserRole
from app.decorators import scodoc, permission_required
from app.models import Departement
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc import sco_utils as scu
@bp.route("/user/<int:uid>")
@api_web_bp.route("/user/<int:uid>")
@login_required
@scodoc
@permission_required(Permission.ScoUsersView)
def user_info(uid: int):
"""
Info sur un compte utilisateur scodoc
"""
user: User = User.query.get(uid)
if user is None:
return error_response(404, "user not found")
if g.scodoc_dept:
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersView)
if user.dept not in allowed_depts:
return error_response(404, "user not found")
return jsonify(user.to_dict())
@bp.route("/users/query")
@api_web_bp.route("/users/query")
@login_required
@scodoc
@permission_required(Permission.ScoView)
def users_info_query():
"""Utilisateurs, filtrés par dept, active ou début nom
/users/query?departement=dept_acronym&active=1&starts_with=<str:nom>
Si accès via API web, seuls les utilisateurs "accessibles" (selon les
permissions) sont retournés: le département de l'URL est ignoré, seules
les permissions de l'utilisateur sont prises en compte.
"""
query = User.query
active = request.args.get("active")
if active is not None:
active = bool(str(active))
query = query.filter_by(active=active)
departement = request.args.get("departement")
if departement is not None:
query = query.filter_by(dept=departement or None)
starts_with = request.args.get("starts_with")
if starts_with is not None:
# remove % and _ for security
starts_with = starts_with.translate({ord(c): None for c in "%_"})
query = query.filter(User.nom.ilike(starts_with + "%"))
# Filtre selon permissions:
query = (
query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None))
.filter(UserRole.user == current_user)
.join(Role, UserRole.role_id == Role.id)
.filter(Role.permissions.op("&")(Permission.ScoUsersView) != 0)
)
query = query.order_by(User.user_name)
return jsonify([user.to_dict() for user in query])
@bp.route("/user/create", methods=["POST"])
@api_web_bp.route("/user/create", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoUsersAdmin)
def user_create():
"""Création d'un utilisateur
The request content type should be "application/json":
{
"user_name": str,
"dept": str or null,
"nom": str,
"prenom": str,
"active":bool (default True)
}
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user_name = data.get("user_name")
if not user_name:
return error_response(404, "empty user_name")
user = User.query.filter_by(user_name=user_name).first()
if user:
return error_response(404, f"user_create: user {user} already exists\n")
dept = data.get("dept")
if dept == "@all":
dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin)
if dept not in allowed_depts:
return error_response(403, "user_create: departement non autorise")
if (dept is not None) and (
Departement.query.filter_by(acronym=dept).first() is None
):
return error_response(404, "user_create: departement inexistant")
nom = data.get("nom")
prenom = data.get("prenom")
active = scu.to_bool(data.get("active", True))
user = User(user_name=user_name, active=active, dept=dept, nom=nom, prenom=prenom)
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict())
@bp.route("/user/edit/<int:uid>", methods=["POST"])
@api_web_bp.route("/user/edit/<int:uid>", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoUsersAdmin)
def user_edit(uid: int):
"""Modification d'un utilisateur
Champs modifiables:
{
"dept": str or null,
"nom": str,
"prenom": str,
"active":bool
}
"""
data = request.get_json(force=True) # may raise 400 Bad Request
user: User = User.query.get_or_404(uid)
# L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée
orig_dept = user.dept
dest_dept = data.get("dept", False)
if dest_dept is not False:
if dest_dept == "@all":
dest_dept = None
allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin)
if (None not in allowed_depts) and (
(orig_dept not in allowed_depts) or (dest_dept not in allowed_depts)
):
return error_response(403, "user_edit: departement non autorise")
if dest_dept != orig_dept:
if (dest_dept is not None) and (
Departement.query.filter_by(acronym=dest_dept).first() is None
):
return error_response(404, "user_edit: departement inexistant")
user.dept = dest_dept
user.nom = data.get("nom", user.nom)
user.prenom = data.get("prenom", user.prenom)
user.active = scu.to_bool(data.get("active", user.active))
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict())
@bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/add", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/add/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
def user_role_add(uid: int, role_name: str, dept: str = None):
"""Add a role to the user"""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return error_response(403, "user_role_add: departement non autorise")
user.add_role(role, dept)
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict())
@bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@api_web_bp.route("/user/<int:uid>/role/<string:role_name>/remove", methods=["POST"])
@bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@api_web_bp.route(
"/user/<int:uid>/role/<string:role_name>/remove/departement/<string:dept>",
methods=["POST"],
)
@login_required
@scodoc
@permission_required(Permission.ScoSuperAdmin)
def user_role_remove(uid: int, role_name: str, dept: str = None):
"""Remove the role from the user"""
user: User = User.query.get_or_404(uid)
role: Role = Role.query.filter_by(name=role_name).first_or_404()
if dept is not None: # check
_ = Departement.query.filter_by(acronym=dept).first_or_404()
allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin)
if (None not in allowed_depts) and (dept not in allowed_depts):
return error_response(403, "user_role_remove: departement non autorise")
query = UserRole.query.filter(UserRole.role == role, UserRole.user == user)
if dept is not None:
query = query.filter(UserRole.dept == dept)
user_role = query.first()
if user_role:
db.session.delete(user_role)
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict())
@bp.route("/roles")
@api_web_bp.route("/roles")
@login_required
@scodoc
@permission_required(Permission.ScoView)
def list_roles():
"""Tous les rôles définis"""
return jsonify([role.to_dict() for role in Role.query])
@bp.route("/permissions")
@api_web_bp.route("/permissions")
@login_required
@scodoc
@permission_required(Permission.ScoView)
def list_permissions():
"""Liste des noms de permissions définies"""
return jsonify(list(Permission.permission_by_name.keys()))