ScoDoc/tools/migrate_abs_to_assiduites.py

456 lines
14 KiB
Python

"""
Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
Ecrit par Matthias HARTMANN
"""
from datetime import date, datetime, time, timedelta
from json import dump, dumps
from sqlalchemy import not_
from flask import g
from app import db
from app.models import (
Absence,
Assiduite,
Departement,
Identite,
Justificatif,
ModuleImplInscription,
)
from app.models.config import ScoDocSiteConfig
from app.profiler import Profiler
from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
TerminalColor,
localize_datetime,
print_progress_bar,
)
from app.scodoc import notesdb as ndb
class _glob:
"""variables globales du script"""
DEBUG: bool = False
PROBLEMS: dict[int, list[str]] = {}
DEPT_ETUDIDS: dict[int, Identite] = {}
COMPTE: list[int, int] = []
ERR_ETU: list[int] = []
MERGER_ASSI: "_Merger" = None
MERGER_JUST: "_Merger" = None
JUSTIFS: dict[int, list[tuple[datetime, datetime]]] = {}
MORNING: time = None
NOON: time = None
AFTERNOON: time = None
EVENING: time = None
class _Merger:
def __init__(self, abs_: Absence, est_abs: bool) -> None:
self.deb = (abs_.jour, abs_.matin)
self.fin = (abs_.jour, abs_.matin)
self.moduleimpl = abs_.moduleimpl_id
self.etudid = abs_.etudid
self.est_abs = est_abs
self.raison = abs_.description
self.entry_date = abs_.entry_date
self.est_just = abs_.estjust
def merge(self, abs_: Absence) -> bool:
"""Fusionne les absences.
Return False si pas de fusion.
"""
if self.etudid != abs_.etudid:
return False
# Cas d'une même absence enregistrée plusieurs fois
if self.fin == (abs_.jour, abs_.matin):
self.moduleimpl = None
else:
if self.fin[1]:
if abs_.jour != self.fin[0]:
return False
else:
day_after: date = abs_.jour - timedelta(days=1) == self.fin[0]
if not (day_after and abs_.matin and self.est_just == abs_.estjust):
return False
self.est_just = self.est_just or abs_.estjust
self.fin = (abs_.jour, abs_.matin)
return True
@staticmethod
def _tuple_to_date(couple: tuple[date, bool], end=False):
if couple[1]:
time_ = _glob.NOON if end else _glob.MORNING
date_ = datetime.combine(couple[0], time_)
else:
time_ = _glob.EVENING if end else _glob.AFTERNOON
date_ = datetime.combine(couple[0], time_)
d = localize_datetime(date_)
return d
def _to_justif(self):
date_deb = _Merger._tuple_to_date(self.deb)
date_fin = _Merger._tuple_to_date(self.fin, end=True)
_glob.JUSTIFS[self.etudid].append((date_deb, date_fin))
_glob.cursor.execute(
"""INSERT INTO justificatifs
(etudid,date_debut,date_fin,etat,raison,entry_date)
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(raison)s,%(entry_date)s)
""",
{
"etudid": self.etudid,
"date_debut": date_deb,
"date_fin": date_fin,
"etat": EtatJustificatif.VALIDE,
"raison": self.raison,
"entry_date": self.entry_date,
},
)
def _to_assi(self):
date_deb = _Merger._tuple_to_date(self.deb)
date_fin = _Merger._tuple_to_date(self.fin, end=True)
self.est_just = (
_assi_in_justifs(date_deb, date_fin, self.etudid) or self.est_just
)
if _glob.MERGER_JUST is not None and not self.est_just:
justi_date_deb = _Merger._tuple_to_date(_glob.MERGER_JUST.deb)
justi_date_fin = _Merger._tuple_to_date(_glob.MERGER_JUST.fin, end=True)
justifiee = date_deb >= justi_date_deb and date_fin <= justi_date_fin
self.est_just = justifiee
_glob.cursor.execute(
"""INSERT INTO assiduites
(etudid,date_debut,date_fin,etat,moduleimpl_id,description,entry_date,est_just)
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(moduleimpl_id)s,%(description)s,%(entry_date)s, %(est_just)s)
""",
{
"etudid": self.etudid,
"date_debut": date_deb,
"date_fin": date_fin,
"etat": EtatAssiduite.ABSENT,
"moduleimpl_id": self.moduleimpl,
"description": self.raison,
"entry_date": self.entry_date,
"est_just": self.est_just,
},
)
def export(self):
"""Génère un nouvel objet Assiduité ou Justificatif"""
if self.est_abs:
_glob.COMPTE[0] += 1
self._to_assi()
else:
_glob.COMPTE[1] += 1
self._to_justif()
def _assi_in_justifs(deb, fin, etudid):
return any(deb >= j[0] and fin <= j[1] for j in _glob.JUSTIFS[etudid])
class _Statistics:
def __init__(self) -> None:
self.object: dict[str, dict | int] = {"total": 0}
self.year: int = None
def __set_year(self, year: int):
if year not in self.object:
self.object[year] = {
"etuds_inexistant": [],
"abs_invalide": {},
}
self.year = year
return self
def __add_etud(self, etudid: int):
if etudid not in self.object[self.year]["etuds_inexistant"]:
self.object[self.year]["etuds_inexistant"].append(etudid)
return self
def __add_abs(self, abs_: int, err: str):
if abs_ not in self.object[self.year]["abs_invalide"]:
self.object[self.year]["abs_invalide"][abs_] = [err]
else:
self.object[self.year]["abs_invalide"][abs_].append(err)
return self
def add_problem(self, abs_: Absence, err: str):
"""Ajoute un nouveau problème dans les statistiques"""
abs_.jour: date
pivot: date = date(abs_.jour.year, 9, 15)
year: int = abs_.jour.year
if pivot < abs_.jour:
year += 1
self.__set_year(year)
if err == "Etudiant inexistant":
self.__add_etud(abs_.etudid)
else:
self.__add_abs(abs_.id, err)
self.object["total"] += 1
def compute_stats(self) -> dict:
"""Comptage des statistiques"""
stats: dict = {"total": self.object["total"]}
for year, item in self.object.items():
if year == "total":
continue
stats[year] = {}
stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"])
stats[year]["abs_invalide"] = len(item["abs_invalide"])
return stats
def export(self, file):
"""Sérialise les statistiques dans un fichier"""
dump(self.object, file, indent=2)
def migrate_abs_to_assiduites(
dept: str = None,
morning: str = None,
noon: str = None,
afternoon: str = None,
evening: str = None,
debug: bool = False,
):
"""
une absence à 3 états:
|.estabs|.estjust|
|1|0| -> absence non justifiée
|1|1| -> absence justifiée
|0|1| -> justifié
dualité des temps :
.matin: bool (0:00 -> time_pref | time_pref->23:59:59)
.jour : date (jour de l'absence/justificatif)
.moduleimpl_id: relation -> moduleimpl_id
description:str -> motif abs / raison justif
.entry_date: datetime -> timestamp d'entrée de l'abs
.etudid: relation -> Identite
"""
Profiler.clear()
_glob.DEBUG = debug
if morning is None:
morning = ScoDocSiteConfig.get("assi_morning_time", time(8, 0))
morning: list[str] = str(morning).split(":")
_glob.MORNING = time(int(morning[0]), int(morning[1]))
if noon is None:
noon = ScoDocSiteConfig.get("assi_lunch_time", time(13, 0))
noon: list[str] = str(noon).split(":")
_glob.NOON = time(int(noon[0]), int(noon[1]))
if afternoon is None:
afternoon = ScoDocSiteConfig.get("assi_lunch_time", time(13, 0))
afternoon: list[str] = str(afternoon).split(":")
_glob.AFTERNOON = time(int(afternoon[0]), int(afternoon[1]))
if evening is None:
evening = ScoDocSiteConfig.get("assi_afternoon_time", time(18, 0))
evening: list[str] = str(evening).split(":")
_glob.EVENING = time(int(evening[0]), int(evening[1]))
ndb.open_db_connection()
_glob.cnx = g.db_conn
_glob.cursor = _glob.cnx.cursor()
if dept is None:
prof_total = Profiler("MigrationTotal")
prof_total.start()
depart: Departement
for depart in Departement.query.order_by(Departement.id):
migrate_dept(
depart.acronym, _Statistics(), Profiler(f"Migration_{depart.acronym}")
)
prof_total.stop()
print(
TerminalColor.GREEN
+ f"Fin de la migration, elle a durée {prof_total.elapsed():.2f}"
+ TerminalColor.RESET
)
else:
migrate_dept(dept, _Statistics(), Profiler("Migration"))
def migrate_dept(dept_name: str, stats: _Statistics, time_elapsed: Profiler):
time_elapsed.start()
absences_query = Absence.query
dept: Departement = Departement.query.filter_by(acronym=dept_name).first()
if dept is None:
raise ValueError(f"Département inexistant: {dept_name}")
etuds_id: list[int] = [etud.id for etud in dept.etudiants]
for etudid in etuds_id:
_glob.JUSTIFS[etudid] = []
absences_query = absences_query.filter(Absence.etudid.in_(etuds_id))
absences: Absence = absences_query.order_by(
Absence.etudid, Absence.jour, not_(Absence.matin)
)
absences_len: int = absences.count()
if absences_len == 0:
print(
f"{TerminalColor.BLUE}Le département {dept_name} ne possède aucune absence.{TerminalColor.RESET}"
)
return
_glob.DEPT_ETUDIDS = {e.id for e in Identite.query.filter_by(dept_id=dept.id)}
_glob.COMPTE = [0, 0]
_glob.ERR_ETU = []
_glob.MERGER_ASSI = None
_glob.MERGER_JUST = None
print(
f"{TerminalColor.BLUE}{absences_len} absences du département {dept_name} vont être migrées{TerminalColor.RESET}"
)
print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True)
etuds_modimpl_ids = {}
for i, abs_ in enumerate(absences):
etud_modimpl_ids = etuds_modimpl_ids.get(abs_.etudid)
if etud_modimpl_ids is None:
etud_modimpl_ids = {
ins.moduleimpl_id
for ins in ModuleImplInscription.query.filter_by(etudid=abs_.etudid)
}
etuds_modimpl_ids[abs_.etudid] = etud_modimpl_ids
try:
_from_abs_to_assiduite_justificatif(abs_, etud_modimpl_ids)
except ValueError as e:
stats.add_problem(abs_, e.args[0])
if i % 10 == 0:
print_progress_bar(
i,
absences_len,
"Progression",
"effectué",
autosize=True,
)
if i % 1000 == 0:
print_progress_bar(
i,
absences_len,
"Progression",
"effectué",
autosize=True,
)
_glob.cnx.commit()
if _glob.MERGER_ASSI is not None:
_glob.MERGER_ASSI.export()
if _glob.MERGER_JUST is not None:
_glob.MERGER_JUST.export()
_glob.cnx.commit()
print_progress_bar(
absences_len,
absences_len,
"Progression",
"effectué",
autosize=True,
)
# print(
# TerminalColor.RED
# + f"Justification des absences du département {dept_name}, veuillez patienter, ceci peut prendre un certain temps."
# + TerminalColor.RESET
# )
# justifs: Justificatif = Justificatif.query.join(Identite).filter_by(dept_id=dept.id)
# compute_assiduites_justified(justifs, reset=True)
time_elapsed.stop()
statistiques: dict = stats.compute_stats()
print(
f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}"
)
filename = f"/opt/scodoc-data/log/{datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}scodoc_migration_abs_{dept_name}.json"
if statistiques["total"] > 0:
print(
f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
)
print(
f"Vous retrouverez un fichier json {TerminalColor.GREEN}{filename}{TerminalColor.RED} contenant les problèmes de migrations"
)
with open(
filename,
"w",
encoding="utf-8",
) as file:
stats.export(file)
print(
f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés pour le département {dept_name}.{TerminalColor.RESET}"
)
if _glob.DEBUG:
print(dumps(statistiques, indent=2))
def _from_abs_to_assiduite_justificatif(_abs: Absence, etud_modimpl_ids: set[int]):
if _abs.etudid not in _glob.DEPT_ETUDIDS:
raise ValueError("Etudiant inexistant")
if _abs.estabs:
if (_abs.moduleimpl_id is not None) and (
_abs.moduleimpl_id not in etud_modimpl_ids
):
raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit")
if _glob.MERGER_ASSI is None:
_glob.MERGER_ASSI = _Merger(_abs, True)
elif _glob.MERGER_ASSI.merge(_abs):
pass
else:
_glob.MERGER_ASSI.export()
_glob.MERGER_ASSI = _Merger(_abs, True)
if _abs.estjust:
if _glob.MERGER_JUST is None:
_glob.MERGER_JUST = _Merger(_abs, False)
elif _glob.MERGER_JUST.merge(_abs):
pass
else:
_glob.MERGER_JUST.export()
_glob.MERGER_JUST = _Merger(_abs, False)