############################################################################## # ScoDoc # Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. # See LICENSE ############################################################################## """ ScoDoc 9 API : accès aux utilisateurs """ from flask import g, jsonify, request from flask_login import current_user, login_required from app import db, log from app.api import api_bp as bp, api_web_bp from app.scodoc.sco_utils import json_error from app.auth.models import User, Role, UserRole from app.auth.models import is_valid_password from app.decorators import scodoc, permission_required from app.models import Departement from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc import sco_utils as scu @bp.route("/user/") @api_web_bp.route("/user/") @login_required @scodoc @permission_required(Permission.ScoUsersView) def user_info(uid: int): """ Info sur un compte utilisateur scodoc """ user: User = User.query.get(uid) if user is None: return json_error(404, "user not found") if g.scodoc_dept: allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersView) if (None not in allowed_depts) and (user.dept not in allowed_depts): return json_error(404, "user not found") return jsonify(user.to_dict()) @bp.route("/users/query") @api_web_bp.route("/users/query") @login_required @scodoc @permission_required(Permission.ScoView) def users_info_query(): """Utilisateurs, filtrés par dept, active ou début nom /users/query?departement=dept_acronym&active=1&starts_with= Seuls les utilisateurs "accessibles" (selon les permissions) sont retournés. Si accès via API web, le département de l'URL est ignoré, seules les permissions de l'utilisateur sont prises en compte. """ query = User.query active = request.args.get("active") if active is not None: active = bool(str(active)) query = query.filter_by(active=active) departement = request.args.get("departement") if departement is not None: query = query.filter_by(dept=departement or None) starts_with = request.args.get("starts_with") if starts_with is not None: # remove % and _ for security starts_with = starts_with.translate({ord(c): None for c in "%_"}) query = query.filter(User.nom.ilike(starts_with + "%")) # Filtre selon permissions: query = ( query.join(UserRole, (UserRole.dept == User.dept) | (UserRole.dept == None)) .filter(UserRole.user == current_user) .join(Role, UserRole.role_id == Role.id) .filter(Role.permissions.op("&")(Permission.ScoUsersView) != 0) ) query = query.order_by(User.user_name) return jsonify([user.to_dict() for user in query]) @bp.route("/user/create", methods=["POST"]) @api_web_bp.route("/user/create", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) def user_create(): """Création d'un utilisateur The request content type should be "application/json": { "user_name": str, "dept": str or null, "nom": str, "prenom": str, "active":bool (default True) } """ data = request.get_json(force=True) # may raise 400 Bad Request user_name = data.get("user_name") if not user_name: return json_error(404, "empty user_name") user = User.query.filter_by(user_name=user_name).first() if user: return json_error(404, f"user_create: user {user} already exists\n") dept = data.get("dept") if dept == "@all": dept = None allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_create: departement non autorise") if (dept is not None) and ( Departement.query.filter_by(acronym=dept).first() is None ): return json_error(404, "user_create: departement inexistant") nom = data.get("nom") prenom = data.get("prenom") active = scu.to_bool(data.get("active", True)) user = User(user_name=user_name, active=active, dept=dept, nom=nom, prenom=prenom) db.session.add(user) db.session.commit() return jsonify(user.to_dict()) @bp.route("/user//edit", methods=["POST"]) @api_web_bp.route("/user//edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) def user_edit(uid: int): """Modification d'un utilisateur Champs modifiables: { "dept": str or null, "nom": str, "prenom": str, "active":bool } """ data = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) # L'utilisateur doit avoir le droit dans le département de départ et celui d'arrivée orig_dept = user.dept dest_dept = data.get("dept", False) if dest_dept is not False: if dest_dept == "@all": dest_dept = None allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and ( (orig_dept not in allowed_depts) or (dest_dept not in allowed_depts) ): return json_error(403, "user_edit: departement non autorise") if dest_dept != orig_dept: if (dest_dept is not None) and ( Departement.query.filter_by(acronym=dest_dept).first() is None ): return json_error(404, "user_edit: departement inexistant") user.dept = dest_dept user.nom = data.get("nom", user.nom) user.prenom = data.get("prenom", user.prenom) user.active = scu.to_bool(data.get("active", user.active)) db.session.add(user) db.session.commit() return jsonify(user.to_dict()) @bp.route("/user//password", methods=["POST"]) @api_web_bp.route("/user//password", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoUsersAdmin) def user_password(uid: int): """Modification du mot de passe d'un utilisateur Champs modifiables: { "password": str } Si le mot de passe ne convient pas, erreur 400. """ data = request.get_json(force=True) # may raise 400 Bad Request user: User = User.query.get_or_404(uid) password = data.get("password") if not password: return json_error(404, "user_password: missing password") if not is_valid_password(password): return json_error(400, "user_password: invalid password") allowed_depts = current_user.get_depts_with_permission(Permission.ScoUsersAdmin) if (None not in allowed_depts) and ((user.dept not in allowed_depts)): return json_error(403, "user_password: departement non autorise") user.set_password(password) db.session.add(user) db.session.commit() return jsonify(user.to_dict()) @bp.route("/user//role//add", methods=["POST"]) @api_web_bp.route("/user//role//add", methods=["POST"]) @bp.route( "/user//role//add/departement/", methods=["POST"], ) @api_web_bp.route( "/user//role//add/departement/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def user_role_add(uid: int, role_name: str, dept: str = None): """Add a role to the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_add: departement non autorise") user.add_role(role, dept) db.session.add(user) db.session.commit() return jsonify(user.to_dict()) @bp.route("/user//role//remove", methods=["POST"]) @api_web_bp.route("/user//role//remove", methods=["POST"]) @bp.route( "/user//role//remove/departement/", methods=["POST"], ) @api_web_bp.route( "/user//role//remove/departement/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def user_role_remove(uid: int, role_name: str, dept: str = None): """Remove the role from the user""" user: User = User.query.get_or_404(uid) role: Role = Role.query.filter_by(name=role_name).first_or_404() if dept is not None: # check _ = Departement.query.filter_by(acronym=dept).first_or_404() allowed_depts = current_user.get_depts_with_permission(Permission.ScoSuperAdmin) if (None not in allowed_depts) and (dept not in allowed_depts): return json_error(403, "user_role_remove: departement non autorise") query = UserRole.query.filter(UserRole.role == role, UserRole.user == user) if dept is not None: query = query.filter(UserRole.dept == dept) user_role = query.first() if user_role: db.session.delete(user_role) db.session.add(user) db.session.commit() return jsonify(user.to_dict()) @bp.route("/permissions") @api_web_bp.route("/permissions") @login_required @scodoc @permission_required(Permission.ScoUsersView) def list_permissions(): """Liste des noms de permissions définies""" return jsonify(list(Permission.permission_by_name.keys())) @bp.route("/role/") @api_web_bp.route("/role/") @login_required @scodoc @permission_required(Permission.ScoUsersView) def list_role(role_name: str): """Un rôle""" return jsonify(Role.query.filter_by(name=role_name).first_or_404().to_dict()) @bp.route("/roles") @api_web_bp.route("/roles") @login_required @scodoc @permission_required(Permission.ScoUsersView) def list_roles(): """Tous les rôles définis""" return jsonify([role.to_dict() for role in Role.query]) @bp.route( "/role//add_permission/", methods=["POST"], ) @api_web_bp.route( "/role//add_permission/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def role_permission_add(role_name: str, perm_name: str): """Add permission to role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_add: permission inconnue") role.add_permission(permission) db.session.add(role) db.session.commit() return jsonify(role.to_dict()) @bp.route( "/role//remove_permission/", methods=["POST"], ) @api_web_bp.route( "/role//remove_permission/", methods=["POST"], ) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def role_permission_remove(role_name: str, perm_name: str): """Remove permission from role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() permission = Permission.get_by_name(perm_name) if permission is None: return json_error(404, "role_permission_remove: permission inconnue") role.remove_permission(permission) db.session.add(role) db.session.commit() return jsonify(role.to_dict()) @bp.route("/role/create/", methods=["POST"]) @api_web_bp.route("/role/create/", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def role_create(role_name: str): """Create a new role with permissions. { "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first() if role: return json_error(404, "role_create: role already exists") role = Role(name=role_name) data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions") if permissions: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") db.session.add(role) db.session.commit() return jsonify(role.to_dict()) @bp.route("/role//edit", methods=["POST"]) @api_web_bp.route("/role//edit", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def role_edit(role_name: str): """Edit a role. On peut spécifier un nom et/ou des permissions. { "name" : name "permissions" : [ 'ScoView', ... ] } """ role: Role = Role.query.filter_by(name=role_name).first_or_404() data = request.get_json(force=True) # may raise 400 Bad Request permissions = data.get("permissions", False) if permissions is not False: try: role.set_named_permissions(permissions) except ScoValueError: return json_error(404, "role_create: invalid permissions") role_name = data.get("role_name") if role_name and role_name != role.name: existing_role: Role = Role.query.filter_by(name=role_name).first() if existing_role: return json_error(404, "role_edit: role name already exists") role.name = role_name db.session.add(role) db.session.commit() return jsonify(role.to_dict()) @bp.route("/role//delete", methods=["POST"]) @api_web_bp.route("/role//delete", methods=["POST"]) @login_required @scodoc @permission_required(Permission.ScoSuperAdmin) def role_delete(role_name: str): """Delete a role""" role: Role = Role.query.filter_by(name=role_name).first_or_404() db.session.delete(role) db.session.commit() return jsonify({"OK": True})