# -*- coding: UTF-8 -*- """Application Flask: ScoDoc """ from pprint import pprint as pp import re import sys import click import flask from flask.cli import with_appcontext from flask.templating import render_template from flask_login import login_user, logout_user, current_user import psycopg2 import sqlalchemy import app as mapp from app import create_app, cli, db from app import initialize_scodoc_database from app import clear_scodoc_cache from app import models from app.auth.models import User, Role, UserRole from app.entreprises.models import entreprises_reset_database from app.models import Departement, departements from app.models import Formation, UniteEns, Matiere, Module from app.models import FormSemestre, FormSemestreInscription from app.models import GroupDescr from app.models import Identite from app.models import ModuleImpl, ModuleImplInscription from app.models import Partition from app.models import ScolarAutorisationInscription, ScolarFormSemestreValidation from app.models.but_refcomp import ( ApcCompetence, ApcNiveau, ApcParcours, ApcReferentielCompetences, ) from app.models.but_validations import ApcValidationAnnee, ApcValidationRCUE from app.models.evaluations import Evaluation from app.scodoc import sco_dump_db from app.scodoc.sco_logos import make_logo_local from app.scodoc.sco_permissions import Permission from app.views import notes, scolar import app.scodoc.sco_utils as scu import tools from tools.fakedatabase import create_test_api_database from config import RunningConfig app = create_app(RunningConfig) cli.register(app) @app.context_processor def inject_sco_utils(): "Make scu available in all Jinja templates" return dict(scu=scu) @app.shell_context_processor def make_shell_context(): import numpy as np import pandas as pd import app as mapp # le package app from app.scodoc import notesdb as ndb from app.comp import res_sem from app.comp.res_but import ResultatsSemestreBUT from app.scodoc import sco_utils as scu return { "ApcCompetence": ApcCompetence, "ApcNiveau": ApcNiveau, "ApcParcours": ApcParcours, "ApcReferentielCompetences": ApcReferentielCompetences, "ApcValidationRCUE": ApcValidationRCUE, "ApcValidationAnnee": ApcValidationAnnee, "ctx": app.test_request_context(), "current_app": flask.current_app, "current_user": current_user, "Departement": Departement, "db": db, "Evaluation": Evaluation, "flask": flask, "Formation": Formation, "FormSemestre": FormSemestre, "FormSemestreInscription": FormSemestreInscription, "GroupDescr": GroupDescr, "Identite": Identite, "login_user": login_user, "logout_user": logout_user, "mapp": mapp, "Matiere": Matiere, "models": models, "Module": Module, "ModuleImpl": ModuleImpl, "ModuleImplInscription": ModuleImplInscription, "ndb": ndb, "notes": notes, "np": np, "Partition": Partition, "pd": pd, "Permission": Permission, "pp": pp, "res_sem": res_sem, "ResultatsSemestreBUT": ResultatsSemestreBUT, "Role": Role, "ScoDocSiteConfig": models.ScoDocSiteConfig, "scolar": scolar, "ScolarAutorisationInscription": ScolarAutorisationInscription, "ScolarFormSemestreValidation": ScolarFormSemestreValidation, "ScolarNews": models.ScolarNews, "scu": scu, "UniteEns": UniteEns, "User": User, "UserRole": UserRole, } # ctx.push() # admin = User.query.filter_by(user_name="admin").first() # login_user(admin) @app.cli.command() @click.option("--erase/--no-erase", default=False) def sco_db_init(erase=False): # sco-db-init """Initialize the database. Starts from an existing database and create all the necessary SQL tables and functions. """ if not app.config.get("SCODOC_ADMIN_MAIL"): sys.stderr.write( """La variable SCODOC_ADMIN_MAIL n'est pas positionnée: vérifier votre .env""" ) return 100 initialize_scodoc_database(erase=erase) @app.cli.command() @click.argument("database") def anonymize_db(database): # anonymize-db """Anonymise la base de nom indiqué (et non pas la base courante!)""" click.confirm( f"L'anonymisation va affecter la base {database} et PERDRE beaucoup de données.\nContinuer ?", abort=True, ) sco_dump_db.anonymize_db(database) click.echo(f"Base {database} pseudonymisée") @app.cli.command() def user_db_clear(): """Erase all users and roles from the database !""" click.echo("Erasing the users database !") _clear_users_db() def _clear_users_db(): """Erase (drop) all tables of users database !""" click.confirm( "This will erase all users and roles.\nAre you sure you want to continue?", abort=True, ) db.reflect() try: db.session.query(UserRole).delete() db.session.query(User).delete() db.session.commit() except: db.session.rollback() raise @app.cli.command() @click.argument("username") @click.argument("role") @click.argument("dept") @click.option("-n", "--nom", "nom") @click.option("-p", "--prenom", "prenom") def user_create(username, role, dept, nom=None, prenom=None): # user-create "Create a new user" r = Role.get_named_role(role) if not r: sys.stderr.write(f"user_create: role {role} does not exist\n") return 1 u = User.query.filter_by(user_name=username).first() if u: sys.stderr.write(f"user_create: user {u} already exists\n") return 2 if dept == "@all": dept = None u = User(user_name=username, dept=dept, nom=nom, prenom=prenom) u.add_role(r, dept) db.session.add(u) db.session.commit() click.echo(f"created user, login: {u.user_name}, with role {r} in dept. {dept}") @app.cli.command() @click.argument("username") def user_delete(username): # user-delete "Try to delete this user. Fails if it's associated to some scodoc objects." u = User.query.filter_by(user_name=username).first() if not u: sys.stderr.write(f"user_delete: user {username} not found\n") return 2 db.session.delete(u) try: db.session.commit() except (sqlalchemy.exc.IntegrityError, psycopg2.errors.ForeignKeyViolation): sys.stderr.write( f"""\nuser_delete: ne peux pas supprimer l'utilisateur {username}\ncar il est associé à des objets dans ScoDoc (modules, notes, ...).\n""" ) return 1 click.echo(f"deleted user, login: {username}") @app.cli.command() @click.argument("username") @click.password_option() def user_password(username, password=None): # user-password "Set (or change) user's password" if not password: sys.stderr.write("user_password: missing password") return 1 u = User.query.filter_by(user_name=username).first() if not u: sys.stderr.write(f"user_password: user {username} does not exists\n") return 1 u.set_password(password) db.session.add(u) db.session.commit() click.echo(f"changed password for user {u}") @app.cli.command() @click.argument("rolename") @click.argument("permissions", nargs=-1) def create_role(rolename, permissions): # create-role """Create a new role""" # Check rolename if not re.match(r"^[a-zA-Z0-9]+$", rolename): sys.stderr.write(f"create_role: invalid rolename {rolename}\n") return 1 # Check permissions permission_list = [] for permission_name in permissions: perm = Permission.get_by_name(permission_name) if not perm: sys.stderr.write(f"create_role: invalid permission name {perm}\n") sys.stderr.write( f"\tavailable permissions: {', '.join([ name for name in Permission.permission_by_name])}.\n" ) return 1 permission_list.append(perm) role = Role.query.filter_by(name=rolename).first() if role: sys.stderr.write(f"create_role: role {rolename} already exists\n") return 1 role = Role(name=rolename) for perm in permission_list: role.add_permission(perm) db.session.add(role) db.session.commit() @app.cli.command() def list_roles(): # list-roles """List all defined roles""" for role in Role.query: print(role) @app.cli.command() @click.argument("rolename") @click.option("-a", "--add", "addpermissionname") @click.option("-r", "--remove", "removepermissionname") def edit_role(rolename, addpermissionname=None, removepermissionname=None): # edit-role """Add [-a] and/or remove [-r] a permission to/from a role. In ScoDoc, permissions are not associated to users but to roles. Each user has a set of roles in each departement. Example: `flask edit-role -a ScoEditApo Ens` """ if addpermissionname: perm_to_add = Permission.get_by_name(addpermissionname) if not perm_to_add: sys.stderr.write( f"edit_role: permission {addpermissionname} does not exists\n" ) return 1 else: perm_to_add = None if removepermissionname: perm_to_remove = Permission.get_by_name(removepermissionname) if not perm_to_remove: sys.stderr.write( f"edit_role: permission {removepermissionname} does not exists\n" ) return 1 else: perm_to_remove = None role = Role.query.filter_by(name=rolename).first() if not role: sys.stderr.write(f"edit_role: role {rolename} does not exists\n") return 1 if perm_to_add: role.add_permission(perm_to_add) click.echo(f"adding permission {addpermissionname} to role {rolename}") if perm_to_remove: role.remove_permission(perm_to_remove) click.echo(f"removing permission {removepermissionname} from role {rolename}") if perm_to_add or perm_to_remove: db.session.add(role) db.session.commit() print(role) @app.cli.command() @click.argument("rolename") def delete_role(rolename): """Delete a role""" role = Role.query.filter_by(name=rolename).first() if role is None: sys.stderr.write(f"delete_role: role {rolename} does not exists\n") return 1 db.session.delete(role) db.session.commit() @app.cli.command() @click.argument("username") @click.option("-d", "--dept", "dept_acronym") @click.option("-a", "--add", "add_role_name") @click.option("-r", "--remove", "remove_role_name") def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None): """Add or remove a role to the given user in the given dept""" user: User = User.query.filter_by(user_name=username).first() if not user: sys.stderr.write(f"user_role: user {username} does not exists\n") return 1 # Sans argument, affiche les rôles de l'utilisateur if dept_acronym is None and add_role_name is None and remove_role_name is None: print(f"Roles for user {user.user_name}") for user_role in sorted( user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name) ): print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""") if dept_acronym: dept = models.Departement.query.filter_by(acronym=dept_acronym).first() if dept is None: sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n") return 2 if add_role_name: role = Role.query.filter_by(name=add_role_name).first() if role is None: sys.stderr.write( f"""user_role: role {add_role_name} does not exists (use list-roles to display existing roles)\n""" ) return 2 user.add_role(role, dept_acronym) if remove_role_name: role = Role.query.filter_by(name=remove_role_name).first() if role is None: sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n") return 2 user_role = UserRole.query.filter( UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym ).first() if user_role: db.session.delete(user_role) db.session.commit() def abort_if_false(ctx, param, value): if not value: ctx.abort() @app.cli.command() @click.option( "-y", "--yes", is_flag=True, callback=abort_if_false, expose_value=False, prompt="""Attention: Cela va effacer toutes les données du département (étudiants, notes, formations, etc). Voulez-vous vraiment continuer ? """, ) @click.option( "-f", "--force", is_flag=True, help="ignore non-existing departement", ) @click.argument("dept") def delete_dept(dept, force=False): # delete-dept """Delete existing departement""" from app.scodoc import notesdb as ndb from app.scodoc import sco_dept msg = "" db.reflect() ndb.open_db_connection() d = models.Departement.query.filter_by(acronym=dept).first() if d is None and not force: sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n") return 2 elif d: msg = sco_dept.delete_dept(d.id) db.session.commit() if msg: print(f"Erreur:\n {msg}") return 0 if not msg else 1 @app.cli.command() @click.argument("dept") def create_dept(dept): # create-dept "Create new departement" _ = departements.create_dept(dept) return 0 @app.cli.command() @click.argument("depts", nargs=-1) def list_depts(depts=""): # list-depts """If dept exists, print it, else nothing. Called without arguments, list all depts along with their ids. """ for dept in models.Departement.query.order_by(models.Departement.id): if not depts or dept.acronym in depts: print(f"{dept.id}\t{dept.acronym}") @app.cli.command() @click.option( "-n", "--name", is_flag=True, help="show database name instead of connexion string (required for " "dropdb/createdb commands)", ) def scodoc_database(name): # scodoc-database """print the database connexion string""" uri = app.config["SQLALCHEMY_DATABASE_URI"] if name: print(uri.split("/")[-1]) else: print(uri) @app.cli.command() @with_appcontext def import_scodoc7_users(): # import-scodoc7-users """Import users defined in ScoDoc7 postgresql database into ScoDoc 9 The old database SCOUSERS must be alive and readable by the current user. This script is typically run as unix user "scodoc". The original SCOUSERS database is left unmodified. """ messages = tools.import_scodoc7_user_db() click.echo("----") click.echo(f"import terminé: {len(messages)} warnings\n") click.echo("\n".join(messages) + "\n") @app.cli.command() @click.argument("dept") @click.argument("dept_db_name") @with_appcontext def import_scodoc7_dept(dept: str, dept_db_name: str = ""): # import-scodoc7-dept """Import département ScoDoc 7: dept: InfoComm, dept_db_name: SCOINFOCOMM""" dept_db_uri = f"postgresql:///{dept_db_name}" tools.import_scodoc7_dept(dept, dept_db_uri) @app.cli.command() @click.argument("dept", default="") @with_appcontext def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives """Post-migration: renomme les archives en fonction des id de ScoDoc 9""" tools.migrate_scodoc7_dept_archives(dept) @app.cli.command() @click.argument("dept", default="") @with_appcontext def migrate_scodoc7_dept_logos(dept: str = ""): # migrate-scodoc7-dept-logos """Post-migration: renomme les logos en fonction des id / dept de ScoDoc 9""" tools.migrate_scodoc7_dept_logos(dept) @app.cli.command() @click.argument("logo", default=None) @click.argument("dept", default=None) @with_appcontext def localize_logo(logo: str = None, dept: str = None): # migrate-scodoc7-dept-logos """Make local to a dept a global logo (both logo and dept names are mandatory)""" if logo in ["header", "footer"]: print( f"Can't make logo '{logo}' local: add a local version throught configuration form instead" ) return make_logo_local(logoname=logo, dept_name=dept) @app.cli.command() @click.argument("formsemestre_id", type=click.INT) @click.argument("xlsfile", type=click.File("rb")) @click.argument("zipfile", type=click.File("rb")) def photos_import_files(formsemestre_id: int, xlsfile: str, zipfile: str): """Import des photos d'étudiants à partir d'une liste excel et d'un zip avec les images.""" from app.scodoc import sco_trombino, sco_photos from app.auth.models import get_super_admin formsemestre = db.session.get(FormSemestre, formsemestre_id) if not formsemestre: sys.stderr.write("photos-import-files: formsemestre_id invalide\n") return 2 with app.test_request_context(): mapp.set_sco_dept(formsemestre.departement.acronym) admin_user = get_super_admin() login_user(admin_user) def callback(etud, data, filename): return sco_photos.store_photo(etud, data, filename) ( ignored_zipfiles, unmatched_files, stored_etud_filename, ) = sco_trombino.zip_excel_import_files( xlsfile=xlsfile, zipfile=zipfile, callback=callback, filename_title="fichier_photo", ) print( render_template( "scolar/photos_import_files.txt", ignored_zipfiles=ignored_zipfiles, unmatched_files=unmatched_files, stored_etud_filename=stored_etud_filename, ) ) @app.cli.command() @click.option("--sanitize/--no-sanitize", default=False) @with_appcontext def clear_cache(sanitize): # clear-cache """Clear ScoDoc cache This cache (currently Redis) is persistent between invocation and it may be necessary to clear it during upgrades, development or tests. """ click.echo("Flushing Redis cache...") clear_scodoc_cache() if sanitize: # sanitizes all formations: click.echo("Checking formations...") for formation in Formation.query: formation.sanitize_old_formation() @app.cli.command() def init_test_database(): # init-test-database """Initialise les objets en base pour les tests API (à appliquer sur SCODOC_TEST ou SCODOC_DEV) """ click.echo("Initialisation base de test API...") ctx = app.test_request_context() ctx.push() admin = User.query.filter_by(user_name="admin").first() login_user(admin) create_test_api_database.init_test_database() def recursive_help(cmd, parent=None): ctx = click.core.Context(cmd, info_name=cmd.name, parent=parent) print(cmd.get_help(ctx)) print() commands = getattr(cmd, "commands", {}) for sub in commands.values(): recursive_help(sub, ctx) @app.cli.command() def entreprises_reset_db(): """Remet a zéro les tables du module relations entreprises""" click.confirm( "This will erase all data from the blueprint 'entreprises'.\nAre you sure you want to continue?", abort=True, ) db.reflect() try: entreprises_reset_database() except: db.session.rollback() raise @app.cli.command() def dumphelp(): """Génère la page d'aide complète pour la doc.""" recursive_help(app.cli) @app.cli.command() @click.option("-h", "--host", default="127.0.0.1", help="The interface to bind to.") @click.option("-p", "--port", default=5000, help="The port to bind to.") @click.option( "--length", default=25, help="Number of functions to include in the profiler report.", ) @click.option( "--profile-dir", default=None, help="Directory where profiler data files are saved." ) def profile(host, port, length, profile_dir): """Start the application under the code profiler.""" from werkzeug.middleware.profiler import ProfilerMiddleware from werkzeug.serving import run_simple app.wsgi_app = ProfilerMiddleware( app.wsgi_app, restrictions=[length], profile_dir=profile_dir ) run_simple( host, port, app, use_debugger=False ) # use run_simple instead of app.run()