# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # ############################################################################## import os from config import Config import re import requests import glob from datetime import date from flask import flash from flask_login import current_user from app.entreprises.models import ( Entreprise, EntrepriseCorrespondant, EntrepriseOffre, EntrepriseOffreDepartement, EntreprisePreferences, EntrepriseSite, EntrepriseHistorique, ) from app import email, db from app.scodoc import sco_excel from app.scodoc import sco_utils as scu from app.models import Departement from app.scodoc.sco_permissions import Permission ENTREPRISES_KEYS = [ "siret", "nom_entreprise", "adresse", "ville", "code_postal", "pays", ] SITES_KEYS = [ [ "siret_entreprise", "id_site", "nom_site", "adresse", "ville", "code_postal", "pays", ], [ "civilite", "nom", "prenom", "telephone", "mail", "poste", "service", "origine", "notes", ], ] CORRESPONDANTS_KEYS = [ "id", "civilite", "nom", "prenom", "telephone", "mail", "poste", "service", "origine", "notes", "nom_site", ] def get_depts(): """ Retourne une liste contenant les l'id des départements des roles de l'utilisateur courant """ depts = [] for role in current_user.user_roles: dept_id = get_dept_id_by_acronym(role.dept) if dept_id is not None: depts.append(dept_id) return depts def get_dept_id_by_acronym(acronym: str): """ Retourne l'id d'un departement a l'aide de son acronym """ dept = Departement.query.filter_by(acronym=acronym).first() if dept is not None: return dept.id return None def get_dept_acronym_by_id(id: int): """ Retourne l'acronym d'un departement a l'aide de son id """ dept = Departement.query.filter_by(id=id).first() if dept is not None: return dept.acronym return None def check_offre_depts(depts: list, offre_depts: list): """ Retourne vrai si l'utilisateur a le droit de visibilité sur l'offre """ if current_user.has_permission(Permission.RelationsEntrepEdit, None): return True for offre_dept in offre_depts: if offre_dept.dept_id in depts: return True return False def get_offre_files_and_depts(offre: EntrepriseOffre, depts: list): """ Retourne l'offre, les fichiers attachés a l'offre, les département liés a l'offre et le correspondant """ offre_depts = EntrepriseOffreDepartement.query.filter_by(offre_id=offre.id).all() correspondant = EntrepriseCorrespondant.query.filter_by( id=offre.correspondant_id ).first() if not offre_depts or check_offre_depts(depts, offre_depts): files = [] path = os.path.join( Config.SCODOC_VAR_DIR, "entreprises", f"{offre.entreprise_id}", f"{offre.id}", ) if os.path.exists(path): for dir in glob.glob( f"{path}/[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]-[0-9][0-9]" ): for _file in glob.glob(f"{dir}/*"): file = [os.path.basename(dir), os.path.basename(_file)] files.append(file) return [offre, files, offre_depts, correspondant] return None def get_offres_non_expirees_with_files(offres): """ Retourne une liste avec toutes les offres non expirées (offre, files, offre_depts, correspondant) """ depts = get_depts() offres_with_files = [] for offre in offres: if not offre.expired and ( offre.expiration_date is None or ( offre.expiration_date is not None and date.today() < offre.expiration_date ) ): offre_with_files = get_offre_files_and_depts(offre, depts) if offre_with_files is not None: offres_with_files.append(offre_with_files) return offres_with_files def get_offres_expirees_with_files(offres): """ Retourne une liste avec toutes les offres expirées (offre, files, offre_depts, correspondant) """ depts = get_depts() offres_with_files = [] for offre in offres: if offre.expired or ( offre.expiration_date is not None and date.today() > offre.expiration_date ): offre_with_files = get_offre_files_and_depts(offre, depts) if offre_with_files is not None: offres_with_files.append(offre_with_files) return offres_with_files def send_email_notifications_entreprise(subject: str, entreprise: Entreprise): txt = [ "Une entreprise est en attente de validation", "Entreprise:", f"\tnom: {entreprise.nom}", f"\tsiret: {entreprise.siret}", f"\tadresse: {entreprise.adresse}", f"\tcode postal: {entreprise.codepostal}", f"\tville: {entreprise.ville}", f"\tpays: {entreprise.pays}", ] txt = "\n".join(txt) email.send_email( subject, email.get_from_addr(), [EntreprisePreferences.get_email_notifications], txt, ) return txt def get_excel_book_are(export: bool = False): """ Retourne un Excel avec les 3 feuilles "Entreprises", "Sites" et "Correspondants" si export est True, remplit les feuilles avec les données a exporter """ entreprises_titles = ENTREPRISES_KEYS[:] sites_titles = SITES_KEYS[:] correspondants_titles = CORRESPONDANTS_KEYS[:] wb = sco_excel.ScoExcelBook() ws1 = wb.create_sheet("Entreprises") ws1.append_row( [ ws1.make_cell(it, style) for (it, style) in zip( entreprises_titles, [sco_excel.excel_make_style(bold=True)] * len(entreprises_titles), ) ] ) ws2 = wb.create_sheet("Sites") ws2.append_row( [ ws2.make_cell(it, style) for (it, style) in zip( sites_titles[0], [sco_excel.excel_make_style(bold=True)] * len(sites_titles[0]), ) ] + [ ws2.make_cell(it, style) for (it, style) in zip( sites_titles[1], [sco_excel.excel_make_style(bold=True, color=sco_excel.COLORS.RED)] * len(sites_titles[1]), ) ] ) ws3 = wb.create_sheet("Correspondants") ws3.append_row( [ ws3.make_cell(it, style) for (it, style) in zip( correspondants_titles, [sco_excel.excel_make_style(bold=True)] * len(correspondants_titles), ) ] ) if export: entreprises = Entreprise.query.filter_by(visible=True).all() sites = ( db.session.query(EntrepriseSite) .join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id) .filter_by(visible=True) .all() ) correspondants = ( db.session.query(EntrepriseCorrespondant) .join(EntrepriseSite, EntrepriseCorrespondant.site_id == EntrepriseSite.id) .join(Entreprise, EntrepriseSite.entreprise_id == Entreprise.id) .filter_by(visible=True) .all() ) entreprises_lines = [ [entreprise.to_dict().get(k, "") for k in ENTREPRISES_KEYS] for entreprise in entreprises ] sites_lines = [ [site.to_dict().get(k, "") for k in SITES_KEYS[0]] for site in sites ] correspondants_lines = [ [correspondant.to_dict().get(k, "") for k in CORRESPONDANTS_KEYS] for correspondant in correspondants ] for line in entreprises_lines: cells = [] for it in line: cells.append(ws1.make_cell(it)) ws1.append_row(cells) for line in sites_lines: cells = [] for it in line: cells.append(ws2.make_cell(it)) ws2.append_row(cells) for line in correspondants_lines: cells = [] for it in line: cells.append(ws3.make_cell(it)) ws3.append_row(cells) return wb def check_entreprises_import(m): """ Verifie la feuille Excel "Entreprises" de l'importation données """ entreprises_import = [] siret_list = [] ligne = 1 if m[0] != ENTREPRISES_KEYS: flash( f'Veuillez utilisez la feuille excel à remplir (Feuille "Entreprises", ligne {ligne})' ) return False l = list_to_dict(m) for entreprise_data in l: ligne += 1 entreprise_data["siret"] = entreprise_data["siret"].replace(" ", "") if ( check_entreprise_import(entreprise_data) and entreprise_data["siret"] not in siret_list ): siret_list.append(entreprise_data["siret"]) entreprise = Entreprise.query.filter_by( siret=entreprise_data["siret"], visible=True ).first() if entreprise is None: entreprise_import = Entreprise( siret=entreprise_data["siret"], nom=entreprise_data["nom_entreprise"], adresse=entreprise_data["adresse"], ville=entreprise_data["ville"], codepostal=entreprise_data["code_postal"], pays=entreprise_data["pays"] if entreprise_data["pays"] else "FRANCE", visible=True, ) entreprises_import.append(entreprise_import) else: entreprise.nom = entreprise_data["nom_entreprise"] entreprise.adresse = entreprise_data["adresse"] entreprise.ville = entreprise_data["ville"] entreprise.codepostal = entreprise_data["code_postal"] entreprise.pays = ( entreprise_data["pays"] if entreprise_data["pays"] else "FRANCE" ) else: flash( f'Erreur lors de l\'importation (Feuille "Entreprises", ligne {ligne})' ) return False if len(entreprises_import) > 0: log = EntrepriseHistorique( authenticated_user=current_user.user_name, text=f"Importation de {len(entreprises_import)} entreprise(s)", ) db.session.add(log) return entreprises_import def check_entreprise_import(entreprise_data): """ Verifie les données d'une ligne Excel (entreprise) """ champs_obligatoires = ["siret", "nom_entreprise", "adresse", "ville", "code_postal"] for key, value in entreprise_data.items(): # champs obligatoires if key in champs_obligatoires and value == "": return False siret = entreprise_data["siret"] if EntreprisePreferences.get_check_siret(): if re.match("^\d{14}$", siret) is None: return False else: try: req = requests.get( f"https://entreprise.data.gouv.fr/api/sirene/v1/siret/{siret}", timeout=scu.SCO_EXT_TIMEOUT, ) if req.status_code != 200: return False except requests.ConnectionError: return False return True def check_sites_import(m): """ Verifie la feuille Excel "Sites" de l'importation données """ sites_import = [] correspondants_import = [] ligne = 1 if m[0] != sum(SITES_KEYS, []): flash( f'Veuillez utilisez la feuille excel à remplir (Feuille "Sites", ligne {ligne})' ) return False, False l = list_to_dict(m) for site_data in l: ligne += 1 site_data["siret_entreprise"] = site_data["siret_entreprise"].replace(" ", "") if check_site_import(site_data): entreprise = Entreprise.query.filter_by( siret=site_data["siret_entreprise"], visible=True ).first() if site_data["id_site"] == "": site_import = EntrepriseSite( entreprise_id=entreprise.id, nom=site_data["nom_site"], adresse=site_data["adresse"], codepostal=site_data["code_postal"], ville=site_data["ville"], pays=site_data["pays"], ) if site_data["civilite"] == "": sites_import.append(site_import) else: correspondant_import = EntrepriseCorrespondant( civilite=site_data["civilite"], nom=site_data["nom"], prenom=site_data["prenom"], telephone=site_data["telephone"], mail=site_data["mail"], poste=site_data["poste"], service=site_data["service"], origine=site_data["origine"], notes=site_data["notes"], ) sites_import.append(site_import) correspondants_import.append([site_import, correspondant_import]) else: site = EntrepriseSite.query.filter_by(id=site_data["id_site"]).first() site.nom = site_data["nom_site"] site.adresse = site_data["adresse"] site.codepostal = site_data["code_postal"] site.ville = site_data["ville"] site.pays = site_data["pays"] if site_data["civilite"] != "": correspondant_import = EntrepriseCorrespondant( site_id=site.id, civilite=site_data["civilite"], nom=site_data["nom"], prenom=site_data["prenom"], telephone=site_data["telephone"], mail=site_data["mail"], poste=site_data["poste"], service=site_data["service"], origine=site_data["origine"], notes=site_data["notes"], ) correspondants_import.append([None, correspondant_import]) else: flash(f'Erreur lors de l\'importation (Feuille "Sites", ligne {ligne})') return False, False if len(sites_import) > 0: log = EntrepriseHistorique( authenticated_user=current_user.user_name, text=f"Importation de {len(sites_import)} site(s)", ) db.session.add(log) if len(correspondants_import) > 0: log = EntrepriseHistorique( authenticated_user=current_user.user_name, text=f"Importation de {len(correspondants_import)} correspondant(s)", ) db.session.add(log) return sites_import, correspondants_import def check_site_import(site_data): """ Verifie les données d'une ligne Excel (site) """ champs_obligatoires = [ "siret_entreprise", "nom_site", "adresse", "ville", "code_postal", "pays", ] for key, value in site_data.items(): # champs obligatoires if key in champs_obligatoires and value == "": return False if site_data["civilite"] != "": if check_correspondant_import(site_data) is False: return False entreprise = Entreprise.query.filter_by( siret=site_data["siret_entreprise"], visible=True ).first() if entreprise is None: return False site = EntrepriseSite.query.filter_by(nom=site_data["nom_site"]).first() if site_data["id_site"] == "" and site is not None: return False return True def check_correspondants_import(m): """ Verifie la feuille Excel "Correspondants" de l'importation données """ ligne = 1 if m[0] != CORRESPONDANTS_KEYS: flash( f'Veuillez utilisez la feuille excel à remplir (Feuille "Correspondants", ligne {ligne})' ) return False l = list_to_dict(m) for correspondant_data in l: ligne += 1 if correspondant_data["id"] == "": flash( f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})' ) return False correspondant = EntrepriseCorrespondant.query.filter_by( id=correspondant_data["id"] ).first() if correspondant is not None and check_correspondant_import(correspondant_data): correspondant.civilite = correspondant_data["civilite"] correspondant.nom = correspondant_data["nom"] correspondant.prenom = correspondant_data["prenom"] correspondant.telephone = correspondant_data["telephone"] correspondant.mail = correspondant_data["mail"] correspondant.poste = correspondant_data["poste"] correspondant.service = correspondant_data["service"] correspondant.origine = correspondant_data["origine"] correspondant.notes = correspondant_data["notes"] else: flash( f'Erreur lors de l\'importation (Feuille "Correspondants", ligne {ligne})' ) return False return True def check_correspondant_import(correspondant_data): """ Verifie les données d'une ligne Excel (correspondant) """ champs_obligatoires = ["civilite", "nom", "prenom"] for key, value in correspondant_data.items(): # champs obligatoires if key in champs_obligatoires and value == "": return False # civilite entre H ou F if correspondant_data["civilite"].upper() not in ["H", "F"]: return False if ( correspondant_data["telephone"] == "" and correspondant_data["mail"] == "" ): # 1 moyen de contact return False if "siret_entreprise" in correspondant_data: # entreprise_id existant entreprise = Entreprise.query.filter_by( siret=correspondant_data["siret_entreprise"], visible=True ).first() if entreprise is None: return False # correspondant possède le meme nom et prénom dans la meme entreprise if correspondant_data["id_site"] != "": correspondant = EntrepriseCorrespondant.query.filter_by( nom=correspondant_data["nom"], prenom=correspondant_data["prenom"], site_id=correspondant_data["id_site"], ).first() if correspondant is not None: return False return True def list_to_dict(m): """ Transforme une liste de liste (matrice) en liste de dictionnaire (key = premiere liste de la matrice) """ l = [] for data in m[1:]: new_dict = {title: value.strip() for title, value in zip(m[0], data)} l.append(new_dict) return l