# -*- coding: UTF-8 -* """Decorators for permissions, roles and ScoDoc7 Zope compatibility """ import functools from functools import wraps import inspect import types import logging import werkzeug from werkzeug.exceptions import BadRequest import flask from flask import g from flask import abort, current_app from flask import request from flask_login import current_user from flask_login import login_required from flask import current_app import app class ZUser(object): "Emulating Zope User" def __init__(self): "create, based on `flask_login.current_user`" self.username = current_user.user_name def __str__(self): return self.username def has_permission(self, perm, dept=None): """check if this user as the permission `perm` in departement given by `g.scodoc_dept`. """ raise NotImplementedError() class ZRequest(object): "Emulating Zope 2 REQUEST" def __init__(self): self.URL = request.base_url self.URL0 = self.URL self.BASE0 = request.url_root self.QUERY_STRING = request.query_string.decode( "utf-8" ) # query_string is bytes self.REQUEST_METHOD = request.method self.AUTHENTICATED_USER = current_user self.REMOTE_ADDR = request.remote_addr if request.method == "POST": # request.form is a werkzeug.datastructures.ImmutableMultiDict # must copy to get a mutable version (needed by TrivialFormulator) self.form = request.form.copy() if request.files: # Add files in form: self.form.update(request.files) for k in request.form: if k.endswith(":list"): self.form[k[:-5]] = request.form.getlist(k) elif request.method == "GET": self.form = {} for k in request.args: # current_app.logger.debug("%s\t%s" % (k, request.args.getlist(k))) if k.endswith(":list"): self.form[k[:-5]] = request.args.getlist(k) else: self.form[k] = request.args[k] # current_app.logger.info("ZRequest.form=%s" % str(self.form)) self.RESPONSE = ZResponse() def __str__(self): return """REQUEST URL={r.URL} QUERY_STRING={r.QUERY_STRING} REQUEST_METHOD={r.REQUEST_METHOD} AUTHENTICATED_USER={r.AUTHENTICATED_USER} form={r.form} """.format( r=self ) class ZResponse(object): "Emulating Zope 2 RESPONSE" def __init__(self): self.headers = {} def redirect(self, url): # current_app.logger.debug("ZResponse redirect to:" + str(url)) return flask.redirect(url) # http 302 def setHeader(self, header, value): self.headers[header.lower()] = value def scodoc(func): """Décorateur pour toutes les fonctions ScoDoc Affecte le département à g et ouvre la connexion à la base Set `g.scodoc_dept` and `g.scodoc_dept_id` if `scodoc_dept` is present in the argument (for routes like `//Scolarite/sco_exemple`). """ @wraps(func) def scodoc_function(*args, **kwargs): if "scodoc_dept" in kwargs: dept_acronym = kwargs["scodoc_dept"] current_app.logger.info("setting dept to " + dept_acronym) app.set_sco_dept(dept_acronym) del kwargs["scodoc_dept"] elif not hasattr(g, "scodoc_dept"): current_app.logger.info("setting dept to None") g.scodoc_dept = None g.scodoc_dept_id = -1 # invalide return func(*args, **kwargs) return scodoc_function def permission_required(permission): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): # current_app.logger.info("PERMISSION; kwargs=%s" % str(kwargs)) if "scodoc_dept" in kwargs: g.scodoc_dept = kwargs["scodoc_dept"] del kwargs["scodoc_dept"] # current_app.logger.info( # "permission_required: %s in %s" % (permission, g.scodoc_dept) # ) scodoc_dept = getattr(g, "scodoc_dept", None) if not current_user.has_permission(permission, scodoc_dept): abort(403) return f(*args, **kwargs) return login_required(decorated_function) return decorator def admin_required(f): from app.auth.models import Permission return permission_required(Permission.ScoSuperAdmin)(f) def scodoc7func(context): """Décorateur pour intégrer les fonctions Zope 2 de ScoDoc 7. Si on a un kwarg `scodoc_dept`(venant de la route), le stocke dans `g.scodoc_dept`. Ajoute l'argument REQUEST s'il est dans la signature de la fonction. Les paramètres de la query string deviennent des (keywords) paramètres de la fonction. """ def s7_decorator(func): @wraps(func) def scodoc7func_decorator(*args, **kwargs): """Decorator allowing legacy Zope published methods to be called via Flask routes without modification. There are two cases: the function can be called 1. via a Flask route ("top level call") 2. or be called directly from Python. If called via a route, this decorator setups a REQUEST object (emulating Zope2 REQUEST) """ # Détermine si on est appelé via une route ("toplevel") # ou par un appel de fonction python normal. top_level = not hasattr(g, "zrequest") if not top_level: # ne "redécore" pas return func(*args, **kwargs) # --- Emulate Zope's REQUEST REQUEST = ZRequest() g.zrequest = REQUEST req_args = REQUEST.form # args from query string (get) or form (post) # --- Add positional arguments pos_arg_values = [] argspec = inspect.getfullargspec(func) # current_app.logger.info("argspec=%s" % str(argspec)) nb_default_args = len(argspec.defaults) if argspec.defaults else 0 if nb_default_args: arg_names = argspec.args[:-nb_default_args] else: arg_names = argspec.args for arg_name in arg_names: if arg_name == "REQUEST": # special case pos_arg_values.append(REQUEST) elif arg_name == "context": pos_arg_values.append(context) else: v = req_args[arg_name] # try to convert all arguments to INTEGERS # necessary for db ids and boolean values try: v = int(v) except ValueError: pass pos_arg_values.append(v) # current_app.logger.info("pos_arg_values=%s" % pos_arg_values) # current_app.logger.info("req_args=%s" % req_args) # Add keyword arguments if nb_default_args: for arg_name in argspec.args[-nb_default_args:]: if arg_name == "REQUEST": # special case kwargs[arg_name] = REQUEST elif arg_name in req_args: # set argument kw optionnel v = req_args[arg_name] # try to convert all arguments to INTEGERS # necessary for db ids and boolean values try: v = int(v) except (ValueError, TypeError): pass kwargs[arg_name] = v # current_app.logger.info( # "scodoc7func_decorator: top_level=%s, pos_arg_values=%s, kwargs=%s" # % (top_level, pos_arg_values, kwargs) # ) value = func(*pos_arg_values, **kwargs) if not top_level: return value else: if isinstance(value, werkzeug.wrappers.response.Response): return value # redirected # Build response, adding collected http headers: headers = [] kw = {"response": value, "status": 200} if g.zrequest: headers = g.zrequest.RESPONSE.headers if not headers: # no customized header, speedup: return value if "content-type" in headers: kw["mimetype"] = headers["content-type"] r = flask.Response(**kw) for h in headers: r.headers[h] = headers[h] return r return scodoc7func_decorator return s7_decorator # Le "context" de ScoDoc7 class ScoDoc7Context(object): """Context object for legacy Zope methods. Mainly used to call published methods, as context.function(...) """ def __init__(self, name=""): self.name = name # logging.getLogger(__name__).info("created %s" % self) def populate(self, globals_dict): # logging.getLogger(__name__).info("populating context %s" % self) for k in globals_dict: if (not k.startswith("_")) and ( isinstance(globals_dict[k], types.FunctionType) ): setattr(self, k, globals_dict[k].__get__(self)) def __repr__(self): return "ScoDoc7Context('%s')" % self.name