MonScoDocEssai/sco_portal_apogee.py

558 lines
19 KiB
Python

# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# Gestion scolarite IUT
#
# Copyright (c) 1999 - 2020 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
"""Liaison avec le portail ENT (qui donne accès aux infos Apogée)
"""
from sco_utils import *
SCO_CACHE_ETAPE_FILENAME = os.path.join(SCO_TMPDIR, "last_etapes.xml")
def has_portal(context):
"True if we are connected to a portal"
return get_portal_url(context)
class PortalInterface:
def __init__(self):
self.warning = False
def get_portal_url(self, context):
"URL of portal"
portal_url = context.get_preference("portal_url")
if not self.warning:
if portal_url:
log("Portal URL=%s" % portal_url)
else:
log("Portal not configured")
self.warning = True
return portal_url
def get_etapes_url(self, context):
"Full URL of service giving list of etapes (in XML)"
etapes_url = context.get_preference("etapes_url")
if not etapes_url:
# Default:
portal_url = self.get_portal_url(context)
if not portal_url:
return None
api_ver = self.get_portal_api_version(context)
if api_ver > 1:
etapes_url = portal_url + "scodocEtapes.php"
else:
etapes_url = portal_url + "getEtapes.php"
return etapes_url
def get_etud_url(self, context):
"Full URL of service giving list of students (in XML)"
etud_url = context.get_preference("etud_url")
if not etud_url:
# Default:
portal_url = self.get_portal_url(context)
if not portal_url:
return None
api_ver = self.get_portal_api_version(context)
if api_ver > 1:
etud_url = portal_url + "scodocEtudiant.php"
else:
etud_url = portal_url + "getEtud.php"
return etud_url
def get_photo_url(self, context):
"Full URL of service giving photo of student"
photo_url = context.get_preference("photo_url")
if not photo_url:
# Default:
portal_url = self.get_portal_url(context)
if not portal_url:
return None
api_ver = self.get_portal_api_version(context)
if api_ver > 1:
photo_url = portal_url + "scodocPhoto.php"
else:
photo_url = portal_url + "getPhoto.php"
return photo_url
def get_maquette_url(self, context):
"""Full URL of service giving Apogee maquette pour une étape (fichier "CSV")"""
maquette_url = context.get_preference("maquette_url")
if not maquette_url:
# Default:
portal_url = self.get_portal_url(context)
if not portal_url:
return None
maquette_url = portal_url + "scodocMaquette.php"
return maquette_url
def get_portal_api_version(self, context):
"API version of the portal software"
api_ver = context.get_preference("portal_api")
if not api_ver:
# Default:
api_ver = 1
return api_ver
_PI = PortalInterface()
get_portal_url = _PI.get_portal_url
get_etapes_url = _PI.get_etapes_url
get_etud_url = _PI.get_etud_url
get_photo_url = _PI.get_photo_url
get_maquette_url = _PI.get_maquette_url
get_portal_api_version = _PI.get_portal_api_version
def get_inscrits_etape(context, code_etape, anneeapogee=None, ntrials=2):
"""Liste des inscrits à une étape Apogée
Result = list of dicts
ntrials: try several time the same request, useful for some bad web services
"""
log("get_inscrits_etape: code=%s anneeapogee=%s" % (code_etape, anneeapogee))
if anneeapogee is None:
anneeapogee = str(time.localtime()[0])
etud_url = get_etud_url(context)
api_ver = get_portal_api_version(context)
if not etud_url:
return []
portal_timeout = context.get_preference("portal_timeout")
if api_ver > 1:
req = (
etud_url
+ "?"
+ urllib.urlencode((("etape", code_etape), ("annee", anneeapogee)))
)
else:
req = etud_url + "?" + urllib.urlencode((("etape", code_etape),))
actual_timeout = float(portal_timeout) / ntrials
if portal_timeout > 0:
actual_timeout = max(1, actual_timeout)
for _ntrial in range(ntrials):
doc = query_portal(req, timeout=actual_timeout)
if doc:
break
if not doc:
raise ScoValueError("pas de réponse du portail ! (timeout=%s)" % portal_timeout)
etuds = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req))
# Filtre sur annee inscription Apogee:
def check_inscription(e):
if e.has_key("inscription"):
if e["inscription"] == anneeapogee:
return True
else:
return False
else:
log(
"get_inscrits_etape: pas inscription dans code_etape=%s e=%s"
% (code_etape, e)
)
return False # ??? pas d'annee d'inscription dans la réponse
etuds = [e for e in etuds if check_inscription(e)]
return etuds
def query_apogee_portal(context, **args):
"""Recupere les infos sur les etudiants nommés
args: nom, prenom, code_nip
(nom et prenom matchent des parties de noms)
"""
etud_url = get_etud_url(context)
api_ver = get_portal_api_version(context)
if not etud_url:
return []
if api_ver > 1:
if args["nom"] or args["prenom"]:
# Ne fonctionne pas avec l'API 2 sur nom et prenom
# XXX TODO : va poser problème pour la page modif données étudiants : A VOIR
return []
portal_timeout = context.get_preference("portal_timeout")
req = etud_url + "?" + urllib.urlencode(args.items())
doc = query_portal(req, timeout=portal_timeout) # sco_utils
return xml_to_list_of_dicts(doc, req=req)
def xml_to_list_of_dicts(doc, req=None):
"""Convert an XML 1.0 str to a list of dicts."""
if not doc:
return []
# Fix for buggy XML returned by some APIs (eg USPN)
invalid_entities = {
"Ç": "Ç",
"& ": "& ", # only when followed by a space (avoid affecting entities)
# to be completed...
}
for k in invalid_entities:
doc = doc.replace(k, invalid_entities[k])
#
try:
dom = xml.dom.minidom.parseString(doc)
except xml.parsers.expat.ExpatError as e:
# Find faulty part
err_zone = doc.splitlines()[e.lineno - 1][e.offset : e.offset + 20]
# catch bug: log and re-raise exception
log(
"xml_to_list_of_dicts: exception in XML parseString\ndoc:\n%s\n(end xml doc)\n"
% doc
)
raise ScoValueError(
'erreur dans la réponse reçue du portail ! (peut être : "%s")' % err_zone
)
infos = []
try:
if dom.childNodes[0].nodeName != u"etudiants":
raise ValueError
etudiants = dom.getElementsByTagName("etudiant")
for etudiant in etudiants:
d = {}
# recupere toutes les valeurs <valeur>XXX</valeur>
for e in etudiant.childNodes:
if e.nodeType == e.ELEMENT_NODE:
childs = e.childNodes
if len(childs):
d[str(e.nodeName)] = childs[0].nodeValue.encode(SCO_ENCODING)
infos.append(d)
except:
log("*** invalid XML response from Etudiant Web Service")
log("req=%s" % req)
log("doc=%s" % doc)
raise ValueError("invalid XML response from Etudiant Web Service\n%s" % doc)
return infos
def get_infos_apogee_allaccents(context, nom, prenom):
"essai recup infos avec differents codages des accents"
if nom:
unom = unicode(nom, SCO_ENCODING)
nom_noaccents = str(suppression_diacritics(unom))
nom_utf8 = unom.encode("utf-8")
else:
nom_noaccents = nom
nom_utf8 = nom
if prenom:
uprenom = unicode(prenom, SCO_ENCODING)
prenom_noaccents = str(suppression_diacritics(uprenom))
prenom_utf8 = uprenom.encode("utf-8")
else:
prenom_noaccents = prenom
prenom_utf8 = prenom
# avec accents
infos = query_apogee_portal(context, nom=nom, prenom=prenom)
# sans accents
if nom != nom_noaccents or prenom != prenom_noaccents:
infos += query_apogee_portal(
context, nom=nom_noaccents, prenom=prenom_noaccents
)
# avec accents en UTF-8
if nom_utf8 != nom_noaccents or prenom_utf8 != prenom_noaccents:
infos += query_apogee_portal(context, nom=nom_utf8, prenom=prenom_utf8)
return infos
def get_infos_apogee(context, nom, prenom):
"""recupere les codes Apogee en utilisant le web service CRIT"""
if (not nom) and (not prenom):
return []
# essaie plusieurs codages: tirets, accents
infos = get_infos_apogee_allaccents(context, nom, prenom)
nom_st = nom.replace("-", " ")
prenom_st = prenom.replace("-", " ")
if nom_st != nom or prenom_st != prenom:
infos += get_infos_apogee_allaccents(context, nom_st, prenom_st)
# si pas de match et nom ou prenom composé, essaie en coupant
if not infos:
if nom:
nom1 = nom.split()[0]
else:
nom1 = nom
if prenom:
prenom1 = prenom.split()[0]
else:
prenom1 = prenom
if nom != nom1 or prenom != prenom1:
infos += get_infos_apogee_allaccents(context, nom1, prenom1)
return infos
def get_etud_apogee(context, code_nip):
"""Informations à partir du code NIP.
None si pas d'infos sur cet etudiant.
Exception si reponse invalide.
"""
if not code_nip:
return {}
etud_url = get_etud_url(context)
if not etud_url:
return {}
portal_timeout = context.get_preference("portal_timeout")
req = etud_url + "?" + urllib.urlencode((("nip", code_nip),))
doc = query_portal(req, timeout=portal_timeout)
d = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req))
if not d:
return None
if len(d) > 1:
raise ValueError("invalid XML response from Etudiant Web Service\n%s" % doc)
return d[0]
def get_default_etapes(context):
"""Liste par défaut: devrait etre lue d'un fichier de config"""
filename = context.file_path + "/config/default-etapes.txt"
log("get_default_etapes: reading %s" % filename)
f = open(filename)
etapes = {}
for line in f.readlines():
line = line.strip()
if line and line[0] != "#":
dept, code, intitule = [x.strip() for x in line.split(":")]
if dept and code:
if etapes.has_key(dept):
etapes[dept][code] = intitule
else:
etapes[dept] = {code: intitule}
return etapes
def _parse_etapes_from_xml(context, doc):
"""
may raise exception if invalid xml doc
"""
xml_etapes_by_dept = context.get_preference("xml_etapes_by_dept")
# parser XML
dom = xml.dom.minidom.parseString(doc)
infos = {}
if dom.childNodes[0].nodeName != u"etapes":
raise ValueError
if xml_etapes_by_dept:
# Ancien format XML avec des sections par departement:
for d in dom.childNodes[0].childNodes:
if d.nodeType == d.ELEMENT_NODE:
dept = d.nodeName.encode(SCO_ENCODING)
_xml_list_codes(infos, dept, d.childNodes)
else:
# Toutes les étapes:
dept = ""
_xml_list_codes(infos, "", dom.childNodes[0].childNodes)
return infos
def get_etapes_apogee(context):
"""Liste des etapes apogee
{ departement : { code_etape : intitule } }
Demande la liste au portail, ou si échec utilise liste
par défaut
"""
etapes_url = get_etapes_url(context)
infos = {}
if etapes_url:
portal_timeout = context.get_preference("portal_timeout")
log(
"get_etapes_apogee: requesting '%s' with timeout=%s"
% (etapes_url, portal_timeout)
)
doc = query_portal(etapes_url, timeout=portal_timeout)
try:
infos = _parse_etapes_from_xml(context, doc)
# cache le resultat (utile si le portail repond de façon intermitente)
if infos:
log("get_etapes_apogee: caching result")
open(SCO_CACHE_ETAPE_FILENAME, "w").write(doc)
except:
log("invalid XML response from getEtapes Web Service\n%s" % etapes_url)
# Avons nous la copie d'une réponse récente ?
try:
doc = open(SCO_CACHE_ETAPE_FILENAME).read()
infos = _parse_etapes_from_xml(context, doc)
log("using last saved version from " + SCO_CACHE_ETAPE_FILENAME)
except:
infos = {}
else:
# Pas de portail: utilise étapes par défaut livrées avec ScoDoc
log("get_etapes_apogee: no configured URL (using default file)")
infos = get_default_etapes(context)
return infos
def _xml_list_codes(target_dict, dept, nodes):
for e in nodes:
if e.nodeType == e.ELEMENT_NODE:
intitule = e.childNodes[0].nodeValue.encode(SCO_ENCODING)
code = e.attributes["code"].value.encode(SCO_ENCODING)
if target_dict.has_key(dept):
target_dict[dept][code] = intitule
else:
target_dict[dept] = {code: intitule}
def get_etapes_apogee_dept(context):
"""Liste des etapes apogee pour ce departement.
Utilise la propriete 'portal_dept_name' pour identifier le departement.
Si xml_etapes_by_dept est faux (nouveau format XML depuis sept 2014),
le departement n'est pas utilisé: toutes les étapes sont présentées.
Returns [ ( code, intitule) ], ordonnée
"""
xml_etapes_by_dept = context.get_preference("xml_etapes_by_dept")
if xml_etapes_by_dept:
portal_dept_name = context.get_preference("portal_dept_name")
log('get_etapes_apogee_dept: portal_dept_name="%s"' % portal_dept_name)
else:
portal_dept_name = ""
log("get_etapes_apogee_dept: pas de sections par departement")
infos = get_etapes_apogee(context)
if portal_dept_name and not infos.has_key(portal_dept_name):
log(
"get_etapes_apogee_dept: pas de section '%s' dans la reponse portail"
% portal_dept_name
)
return []
if portal_dept_name:
etapes = infos[portal_dept_name].items()
else:
# prend toutes les etapes
etapes = []
for k in infos.keys():
etapes += infos[k].items()
etapes.sort() # tri sur le code etape
return etapes
def _portal_date_dmy2date(s):
"""date inscription renvoyée sous la forme dd/mm/yy
renvoie un objet date, ou None
"""
s = s.strip()
if not s:
return None
else:
d, m, y = [int(x) for x in s.split("/")] # raises ValueError if bad format
if y < 100:
y += 2000 # 21ème siècle
return datetime.date(y, m, d)
def _normalize_apo_fields(infolist):
"""
infolist: liste de dict renvoyés par le portail Apogee
recode les champs: paiementinscription (-> booleen), datefinalisationinscription (date)
ajoute le champs 'paiementinscription_str' : 'ok', 'Non' ou '?'
ajoute les champs 'etape' (= None) et 'prenom' ('') s'ils ne sont pas présents.
"""
for infos in infolist:
if infos.has_key("paiementinscription"):
infos["paiementinscription"] = (
strlower(infos["paiementinscription"]) == "true"
)
if infos["paiementinscription"]:
infos["paiementinscription_str"] = "ok"
else:
infos["paiementinscription_str"] = "Non"
else:
infos["paiementinscription"] = None
infos["paiementinscription_str"] = "?"
if infos.has_key("datefinalisationinscription"):
infos["datefinalisationinscription"] = _portal_date_dmy2date(
infos["datefinalisationinscription"]
)
infos["datefinalisationinscription_str"] = infos[
"datefinalisationinscription"
].strftime("%d/%m/%Y")
else:
infos["datefinalisationinscription"] = None
infos["datefinalisationinscription_str"] = ""
if not infos.has_key("etape"):
infos["etape"] = None
if not infos.has_key("prenom"):
infos["prenom"] = ""
return infolist
def check_paiement_etuds(context, etuds):
"""Interroge le portail pour vérifier l'état de "paiement" et l'étape d'inscription.
Seuls les etudiants avec code NIP sont renseignés.
Renseigne l'attribut booleen 'paiementinscription' dans chaque etud, ainsi que l'étape.
En sortie: modif les champs de chaque etud
'paiementinscription' : True, False ou None
'paiementinscription_str' : 'ok', 'Non' ou '?' ou '(pas de code)'
'etape' : etape Apogee ou None
"""
# interrogation séquentielle longue...
for etud in etuds:
if not etud.has_key("code_nip"):
etud["paiementinscription"] = None
etud["paiementinscription_str"] = "(pas de code)"
etud["datefinalisationinscription"] = None
etud["datefinalisationinscription_str"] = "NA"
etud["etape"] = None
else:
# Modifie certains champs de l'étudiant:
infos = get_etud_apogee(context, etud["code_nip"])
if infos:
for k in (
"paiementinscription",
"paiementinscription_str",
"datefinalisationinscription",
"datefinalisationinscription_str",
"etape",
):
etud[k] = infos[k]
else:
etud["datefinalisationinscription"] = None
etud["datefinalisationinscription_str"] = "Erreur"
etud["datefinalisationinscription"] = None
etud["paiementinscription_str"] = "(pb cnx Apogée)"
def get_maquette_apogee(context, etape="", annee_scolaire=""):
"""Maquette CSV Apogee pour une étape et une annee scolaire"""
maquette_url = get_maquette_url(context)
if not maquette_url:
return None
portal_timeout = context.get_preference("portal_timeout")
req = (
maquette_url
+ "?"
+ urllib.urlencode((("etape", etape), ("annee", annee_scolaire)))
)
doc = query_portal(req, timeout=portal_timeout)
return doc