# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2021 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """Liaison avec le portail ENT (qui donne accès aux infos Apogée) """ import os, time import urllib import xml import xml.sax.saxutils import xml.dom.minidom import datetime import sco_utils as scu from sco_utils import SCO_ENCODING from sco_permissions import ScoEtudInscrit from sco_exceptions import ScoValueError from notes_log import log SCO_CACHE_ETAPE_FILENAME = os.path.join(scu.SCO_TMPDIR, "last_etapes.xml") def has_portal(context): "True if we are connected to a portal" return get_portal_url(context) class PortalInterface: def __init__(self): self.warning = False def get_portal_url(self, context): "URL of portal" portal_url = context.get_preference("portal_url") if not self.warning: if portal_url: log("Portal URL=%s" % portal_url) else: log("Portal not configured") self.warning = True return portal_url def get_etapes_url(self, context): "Full URL of service giving list of etapes (in XML)" etapes_url = context.get_preference("etapes_url") if not etapes_url: # Default: portal_url = self.get_portal_url(context) if not portal_url: return None api_ver = self.get_portal_api_version(context) if api_ver > 1: etapes_url = portal_url + "scodocEtapes.php" else: etapes_url = portal_url + "getEtapes.php" return etapes_url def get_etud_url(self, context): "Full URL of service giving list of students (in XML)" etud_url = context.get_preference("etud_url") if not etud_url: # Default: portal_url = self.get_portal_url(context) if not portal_url: return None api_ver = self.get_portal_api_version(context) if api_ver > 1: etud_url = portal_url + "scodocEtudiant.php" else: etud_url = portal_url + "getEtud.php" return etud_url def get_photo_url(self, context): "Full URL of service giving photo of student" photo_url = context.get_preference("photo_url") if not photo_url: # Default: portal_url = self.get_portal_url(context) if not portal_url: return None api_ver = self.get_portal_api_version(context) if api_ver > 1: photo_url = portal_url + "scodocPhoto.php" else: photo_url = portal_url + "getPhoto.php" return photo_url def get_maquette_url(self, context): """Full URL of service giving Apogee maquette pour une étape (fichier "CSV")""" maquette_url = context.get_preference("maquette_url") if not maquette_url: # Default: portal_url = self.get_portal_url(context) if not portal_url: return None maquette_url = portal_url + "scodocMaquette.php" return maquette_url def get_portal_api_version(self, context): "API version of the portal software" api_ver = context.get_preference("portal_api") if not api_ver: # Default: api_ver = 1 return api_ver _PI = PortalInterface() get_portal_url = _PI.get_portal_url get_etapes_url = _PI.get_etapes_url get_etud_url = _PI.get_etud_url get_photo_url = _PI.get_photo_url get_maquette_url = _PI.get_maquette_url get_portal_api_version = _PI.get_portal_api_version def get_inscrits_etape(context, code_etape, anneeapogee=None, ntrials=2): """Liste des inscrits à une étape Apogée Result = list of dicts ntrials: try several time the same request, useful for some bad web services """ log("get_inscrits_etape: code=%s anneeapogee=%s" % (code_etape, anneeapogee)) if anneeapogee is None: anneeapogee = str(time.localtime()[0]) etud_url = get_etud_url(context) api_ver = get_portal_api_version(context) if not etud_url: return [] portal_timeout = context.get_preference("portal_timeout") if api_ver > 1: req = ( etud_url + "?" + urllib.urlencode((("etape", code_etape), ("annee", anneeapogee))) ) else: req = etud_url + "?" + urllib.urlencode((("etape", code_etape),)) actual_timeout = float(portal_timeout) / ntrials if portal_timeout > 0: actual_timeout = max(1, actual_timeout) for _ntrial in range(ntrials): doc = scu.query_portal(req, timeout=actual_timeout) if doc: break if not doc: raise ScoValueError("pas de réponse du portail ! (timeout=%s)" % portal_timeout) etuds = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req)) # Filtre sur annee inscription Apogee: def check_inscription(e): if e.has_key("inscription"): if e["inscription"] == anneeapogee: return True else: return False else: log( "get_inscrits_etape: pas inscription dans code_etape=%s e=%s" % (code_etape, e) ) return False # ??? pas d'annee d'inscription dans la réponse etuds = [e for e in etuds if check_inscription(e)] return etuds def query_apogee_portal(context, **args): """Recupere les infos sur les etudiants nommés args: nom, prenom, code_nip (nom et prenom matchent des parties de noms) """ etud_url = get_etud_url(context) api_ver = get_portal_api_version(context) if not etud_url: return [] if api_ver > 1: if args["nom"] or args["prenom"]: # Ne fonctionne pas avec l'API 2 sur nom et prenom # XXX TODO : va poser problème pour la page modif données étudiants : A VOIR return [] portal_timeout = context.get_preference("portal_timeout") req = etud_url + "?" + urllib.urlencode(args.items()) doc = scu.query_portal(req, timeout=portal_timeout) # sco_utils return xml_to_list_of_dicts(doc, req=req) def xml_to_list_of_dicts(doc, req=None): """Convert an XML 1.0 str to a list of dicts.""" if not doc: return [] # Fix for buggy XML returned by some APIs (eg USPN) invalid_entities = { "&CCEDIL;": "Ç", "& ": "& ", # only when followed by a space (avoid affecting entities) # to be completed... } for k in invalid_entities: doc = doc.replace(k, invalid_entities[k]) # try: dom = xml.dom.minidom.parseString(doc) except xml.parsers.expat.ExpatError as e: # Find faulty part err_zone = doc.splitlines()[e.lineno - 1][e.offset : e.offset + 20] # catch bug: log and re-raise exception log( "xml_to_list_of_dicts: exception in XML parseString\ndoc:\n%s\n(end xml doc)\n" % doc ) raise ScoValueError( 'erreur dans la réponse reçue du portail ! (peut être : "%s")' % err_zone ) infos = [] try: if dom.childNodes[0].nodeName != u"etudiants": raise ValueError etudiants = dom.getElementsByTagName("etudiant") for etudiant in etudiants: d = {} # recupere toutes les valeurs XXX for e in etudiant.childNodes: if e.nodeType == e.ELEMENT_NODE: childs = e.childNodes if len(childs): d[str(e.nodeName)] = childs[0].nodeValue.encode(SCO_ENCODING) infos.append(d) except: log("*** invalid XML response from Etudiant Web Service") log("req=%s" % req) log("doc=%s" % doc) raise ValueError("invalid XML response from Etudiant Web Service\n%s" % doc) return infos def get_infos_apogee_allaccents(context, nom, prenom): "essai recup infos avec differents codages des accents" if nom: unom = unicode(nom, SCO_ENCODING) nom_noaccents = str(scu.suppression_diacritics(unom)) nom_utf8 = unom.encode("utf-8") else: nom_noaccents = nom nom_utf8 = nom if prenom: uprenom = unicode(prenom, SCO_ENCODING) prenom_noaccents = str(scu.suppression_diacritics(uprenom)) prenom_utf8 = uprenom.encode("utf-8") else: prenom_noaccents = prenom prenom_utf8 = prenom # avec accents infos = query_apogee_portal(context, nom=nom, prenom=prenom) # sans accents if nom != nom_noaccents or prenom != prenom_noaccents: infos += query_apogee_portal( context, nom=nom_noaccents, prenom=prenom_noaccents ) # avec accents en UTF-8 if nom_utf8 != nom_noaccents or prenom_utf8 != prenom_noaccents: infos += query_apogee_portal(context, nom=nom_utf8, prenom=prenom_utf8) return infos def get_infos_apogee(context, nom, prenom): """recupere les codes Apogee en utilisant le web service CRIT""" if (not nom) and (not prenom): return [] # essaie plusieurs codages: tirets, accents infos = get_infos_apogee_allaccents(context, nom, prenom) nom_st = nom.replace("-", " ") prenom_st = prenom.replace("-", " ") if nom_st != nom or prenom_st != prenom: infos += get_infos_apogee_allaccents(context, nom_st, prenom_st) # si pas de match et nom ou prenom composé, essaie en coupant if not infos: if nom: nom1 = nom.split()[0] else: nom1 = nom if prenom: prenom1 = prenom.split()[0] else: prenom1 = prenom if nom != nom1 or prenom != prenom1: infos += get_infos_apogee_allaccents(context, nom1, prenom1) return infos def get_etud_apogee(context, code_nip): """Informations à partir du code NIP. None si pas d'infos sur cet etudiant. Exception si reponse invalide. """ if not code_nip: return {} etud_url = get_etud_url(context) if not etud_url: return {} portal_timeout = context.get_preference("portal_timeout") req = etud_url + "?" + urllib.urlencode((("nip", code_nip),)) doc = scu.query_portal(req, timeout=portal_timeout) d = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req)) if not d: return None if len(d) > 1: raise ValueError("invalid XML response from Etudiant Web Service\n%s" % doc) return d[0] def get_default_etapes(context): """Liste par défaut: devrait etre lue d'un fichier de config""" filename = context.file_path + "/config/default-etapes.txt" log("get_default_etapes: reading %s" % filename) f = open(filename) etapes = {} for line in f.readlines(): line = line.strip() if line and line[0] != "#": dept, code, intitule = [x.strip() for x in line.split(":")] if dept and code: if etapes.has_key(dept): etapes[dept][code] = intitule else: etapes[dept] = {code: intitule} return etapes def _parse_etapes_from_xml(context, doc): """ may raise exception if invalid xml doc """ xml_etapes_by_dept = context.get_preference("xml_etapes_by_dept") # parser XML dom = xml.dom.minidom.parseString(doc) infos = {} if dom.childNodes[0].nodeName != u"etapes": raise ValueError if xml_etapes_by_dept: # Ancien format XML avec des sections par departement: for d in dom.childNodes[0].childNodes: if d.nodeType == d.ELEMENT_NODE: dept = d.nodeName.encode(SCO_ENCODING) _xml_list_codes(infos, dept, d.childNodes) else: # Toutes les étapes: dept = "" _xml_list_codes(infos, "", dom.childNodes[0].childNodes) return infos def get_etapes_apogee(context): """Liste des etapes apogee { departement : { code_etape : intitule } } Demande la liste au portail, ou si échec utilise liste par défaut """ etapes_url = get_etapes_url(context) infos = {} if etapes_url: portal_timeout = context.get_preference("portal_timeout") log( "get_etapes_apogee: requesting '%s' with timeout=%s" % (etapes_url, portal_timeout) ) doc = scu.query_portal(etapes_url, timeout=portal_timeout) try: infos = _parse_etapes_from_xml(context, doc) # cache le resultat (utile si le portail repond de façon intermitente) if infos: log("get_etapes_apogee: caching result") open(SCO_CACHE_ETAPE_FILENAME, "w").write(doc) except: log("invalid XML response from getEtapes Web Service\n%s" % etapes_url) # Avons nous la copie d'une réponse récente ? try: doc = open(SCO_CACHE_ETAPE_FILENAME).read() infos = _parse_etapes_from_xml(context, doc) log("using last saved version from " + SCO_CACHE_ETAPE_FILENAME) except: infos = {} else: # Pas de portail: utilise étapes par défaut livrées avec ScoDoc log("get_etapes_apogee: no configured URL (using default file)") infos = get_default_etapes(context) return infos def _xml_list_codes(target_dict, dept, nodes): for e in nodes: if e.nodeType == e.ELEMENT_NODE: intitule = e.childNodes[0].nodeValue.encode(SCO_ENCODING) code = e.attributes["code"].value.encode(SCO_ENCODING) if target_dict.has_key(dept): target_dict[dept][code] = intitule else: target_dict[dept] = {code: intitule} def get_etapes_apogee_dept(context): """Liste des etapes apogee pour ce departement. Utilise la propriete 'portal_dept_name' pour identifier le departement. Si xml_etapes_by_dept est faux (nouveau format XML depuis sept 2014), le departement n'est pas utilisé: toutes les étapes sont présentées. Returns [ ( code, intitule) ], ordonnée """ xml_etapes_by_dept = context.get_preference("xml_etapes_by_dept") if xml_etapes_by_dept: portal_dept_name = context.get_preference("portal_dept_name") log('get_etapes_apogee_dept: portal_dept_name="%s"' % portal_dept_name) else: portal_dept_name = "" log("get_etapes_apogee_dept: pas de sections par departement") infos = get_etapes_apogee(context) if portal_dept_name and not infos.has_key(portal_dept_name): log( "get_etapes_apogee_dept: pas de section '%s' dans la reponse portail" % portal_dept_name ) return [] if portal_dept_name: etapes = infos[portal_dept_name].items() else: # prend toutes les etapes etapes = [] for k in infos.keys(): etapes += infos[k].items() etapes.sort() # tri sur le code etape return etapes def _portal_date_dmy2date(s): """date inscription renvoyée sous la forme dd/mm/yy renvoie un objet date, ou None """ s = s.strip() if not s: return None else: d, m, y = [int(x) for x in s.split("/")] # raises ValueError if bad format if y < 100: y += 2000 # 21ème siècle return datetime.date(y, m, d) def _normalize_apo_fields(infolist): """ infolist: liste de dict renvoyés par le portail Apogee recode les champs: paiementinscription (-> booleen), datefinalisationinscription (date) ajoute le champs 'paiementinscription_str' : 'ok', 'Non' ou '?' ajoute les champs 'etape' (= None) et 'prenom' ('') s'ils ne sont pas présents. """ for infos in infolist: if infos.has_key("paiementinscription"): infos["paiementinscription"] = ( scu.strlower(infos["paiementinscription"]) == "true" ) if infos["paiementinscription"]: infos["paiementinscription_str"] = "ok" else: infos["paiementinscription_str"] = "Non" else: infos["paiementinscription"] = None infos["paiementinscription_str"] = "?" if infos.has_key("datefinalisationinscription"): infos["datefinalisationinscription"] = _portal_date_dmy2date( infos["datefinalisationinscription"] ) infos["datefinalisationinscription_str"] = infos[ "datefinalisationinscription" ].strftime("%d/%m/%Y") else: infos["datefinalisationinscription"] = None infos["datefinalisationinscription_str"] = "" if not infos.has_key("etape"): infos["etape"] = None if not infos.has_key("prenom"): infos["prenom"] = "" return infolist def check_paiement_etuds(context, etuds): """Interroge le portail pour vérifier l'état de "paiement" et l'étape d'inscription. Seuls les etudiants avec code NIP sont renseignés. Renseigne l'attribut booleen 'paiementinscription' dans chaque etud. En sortie: modif les champs de chaque etud 'paiementinscription' : True, False ou None 'paiementinscription_str' : 'ok', 'Non' ou '?' ou '(pas de code)' 'etape' : etape Apogee ou None """ # interrogation séquentielle longue... for etud in etuds: if not etud.has_key("code_nip"): etud["paiementinscription"] = None etud["paiementinscription_str"] = "(pas de code)" etud["datefinalisationinscription"] = None etud["datefinalisationinscription_str"] = "NA" etud["etape"] = None else: # Modifie certains champs de l'étudiant: infos = get_etud_apogee(context, etud["code_nip"]) if infos: for k in ( "paiementinscription", "paiementinscription_str", "datefinalisationinscription", "datefinalisationinscription_str", "etape", ): etud[k] = infos[k] else: etud["datefinalisationinscription"] = None etud["datefinalisationinscription_str"] = "Erreur" etud["datefinalisationinscription"] = None etud["paiementinscription_str"] = "(pb cnx Apogée)" def get_maquette_apogee(context, etape="", annee_scolaire=""): """Maquette CSV Apogee pour une étape et une annee scolaire""" maquette_url = get_maquette_url(context) if not maquette_url: return None portal_timeout = context.get_preference("portal_timeout") req = ( maquette_url + "?" + urllib.urlencode((("etape", etape), ("annee", annee_scolaire))) ) doc = scu.query_portal(req, timeout=portal_timeout) return doc