# -*- coding: UTF-8 -* """Users and Roles models for ScoDoc """ import base64 from datetime import datetime, timedelta from hashlib import md5 import json import os from time import time from flask import current_app, url_for from flask_login import UserMixin, AnonymousUserMixin from werkzeug.security import generate_password_hash, check_password_hash import jwt from app import db, login from app.scodoc.sco_permissions import Permission from app.scodoc.sco_roles_default import SCO_ROLES_DEFAULTS class User(UserMixin, db.Model): """ScoDoc users, handled by Flask / SQLAlchemy""" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), index=True, unique=True) email = db.Column(db.String(120), index=True, unique=True) password_hash = db.Column(db.String(128)) about_me = db.Column(db.String(140)) last_seen = db.Column(db.DateTime, default=datetime.utcnow) token = db.Column(db.String(32), index=True, unique=True) token_expiration = db.Column(db.DateTime) roles = db.relationship("Role", secondary="user_role", viewonly=True) Permission = Permission def __init__(self, **kwargs): self.roles = [] super(User, self).__init__(**kwargs) if ( not self.roles and self.email and self.email == current_app.config["SCODOC_ADMIN_MAIL"] ): # super-admin admin_role = Role.query.filter_by(name="Admin").first() assert admin_role self.add_role(admin_role, None) db.session.commit() current_app.logger.info("creating user with roles={}".format(self.roles)) def __repr__(self): return "".format(self.username) def __str__(self): return self.username def set_password(self, password): "Set password" if password: self.password_hash = generate_password_hash(password) else: self.password_hash = None def check_password(self, password): """Check given password vs current one. Returns `True` if the password matched, `False` otherwise. """ if not self.password_hash: # user without password can't login return False return check_password_hash(self.password_hash, password) def get_reset_password_token(self, expires_in=600): return jwt.encode( {"reset_password": self.id, "exp": time() + expires_in}, current_app.config["SECRET_KEY"], algorithm="HS256", ).decode("utf-8") @staticmethod def verify_reset_password_token(token): try: id = jwt.decode( token, current_app.config["SECRET_KEY"], algorithms=["HS256"] )["reset_password"] except: return return User.query.get(id) def to_dict(self, include_email=False): data = { "id": self.id, "username": self.username, "last_seen": self.last_seen.isoformat() + "Z", "about_me": self.about_me, } if include_email: data["email"] = self.email return data def from_dict(self, data, new_user=False): for field in ["username", "email", "about_me"]: if field in data: setattr(self, field, data[field]) if new_user and "password" in data: self.set_password(data["password"]) def get_token(self, expires_in=3600): now = datetime.utcnow() if self.token and self.token_expiration > now + timedelta(seconds=60): return self.token self.token = base64.b64encode(os.urandom(24)).decode("utf-8") self.token_expiration = now + timedelta(seconds=expires_in) db.session.add(self) return self.token def revoke_token(self): self.token_expiration = datetime.utcnow() - timedelta(seconds=1) @staticmethod def check_token(token): user = User.query.filter_by(token=token).first() if user is None or user.token_expiration < datetime.utcnow(): return None return user # Permissions management: def has_permission(self, perm, dept): """Check if user has permission `perm` in given `dept`. Emulate Zope `has_permission`` Args: perm: integer, one of the value defined in Permission class. context: """ # les role liés à ce département, et les roles avec dept=None (super-admin) roles_in_dept = ( UserRole.query.filter_by(user_id=self.id) .filter((UserRole.dept == dept) | (UserRole.dept == None)) .all() ) for user_role in roles_in_dept: if user_role.role.has_permission(perm): return True return False # Role management def add_role(self, role, dept): """Add a role to this user. :param role: Role to add. """ self.user_roles.append(UserRole(user=self, role=role, dept=dept)) def add_roles(self, roles, dept): """Add roles to this user. :param roles: Roles to add. """ for role in roles: self.add_role(role, dept) def set_roles(self, roles, dept): self.user_roles = [UserRole(user=self, role=r, dept=dept) for r in roles] def get_roles(self): for role in self.roles: yield role def is_administrator(self): return self.has_permission(Permission.ScoSuperAdmin, None) class AnonymousUser(AnonymousUserMixin): def has_permission(self, perm, dept=None): return False def is_administrator(self): return False login.anonymous_user = AnonymousUser class Role(db.Model): """Roles for ScoDoc""" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) default = db.Column(db.Boolean, default=False, index=True) permissions = db.Column(db.BigInteger) # 64 bits users = db.relationship("User", secondary="user_role", viewonly=True) # __table_args__ = (db.UniqueConstraint("name", "dept", name="_rolename_dept_uc"),) def __init__(self, **kwargs): super(Role, self).__init__(**kwargs) if self.permissions is None: self.permissions = 0 def __repr__(self): return "".format( self.name, self.permissions & ((1 << Permission.NBITS) - 1), w=Permission.NBITS, ) def add_permission(self, perm): self.permissions |= perm def remove_permission(self, perm): self.permissions = self.permissions & ~perm def reset_permissions(self): self.permissions = 0 def has_permission(self, perm): return self.permissions & perm == perm @staticmethod def insert_roles(): """Create default roles""" default_role = "Observateur" for r, permissions in SCO_ROLES_DEFAULTS.items(): role = Role.query.filter_by(name=r).first() if role is None: role = Role(name=r) role.reset_permissions() for perm in permissions: role.add_permission(perm) role.default = role.name == default_role db.session.add(role) db.session.commit() @staticmethod def get_named_role(name): """Returns existing role with given name, or None.""" return Role.query.filter_by(name=name).first() class UserRole(db.Model): """Associate user to role, in a dept. If dept is None, the role applies to all departments (eg super admin). """ id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("user.id")) role_id = db.Column(db.Integer, db.ForeignKey("role.id")) dept = db.Column(db.String(64)) user = db.relationship( User, backref=db.backref("user_roles", cascade="all, delete-orphan") ) role = db.relationship( Role, backref=db.backref("user_roles", cascade="all, delete-orphan") ) def __repr__(self): return "".format(self.user, self.role, self.dept) @login.user_loader def load_user(id): return User.query.get(int(id))