ScoDoc/tools/anonymize_db.py

244 lines
8.0 KiB
Python
Executable File

#!/opt/scodoc/venv/bin/python
# -*- coding: utf-8 -*-
# -*- mode: python -*-
##############################################################################
#
# Gestion scolarite IUT
#
# Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
"""Anonymize une base de données ScoDoc
Runned as user "scodoc" with scodoc and postgresql up.
Travaille entièrement au niveau SQL, n'utilise aucun modèle SQLAlchemy.
E. Viennet, Jan 2019, Fev 2024
"""
import random
import sys
import traceback
import psycopg2
from psycopg2 import extras
def log(msg):
sys.stdout.flush()
sys.stderr.write(msg + "\n")
sys.stderr.flush()
def usage():
sys.stdout.flush()
sys.stderr.flush()
print(f"Usage: {sys.argv[0]} [--users] dbname", file=sys.stderr)
sys.exit(1)
# --- Fonctions d'Anonymisation, en SQL
anonymize_name = "random_text_md5(8)"
anonymize_date = "'1970-01-01'"
anonymize_false = "FALSE"
anonymize_question_str = "'?'"
anonymize_null = "NULL"
# --- Listes de noms et prénoms pour remplacer les identités
NOMS = [
x.strip()
for x in open("/opt/scodoc/tools/fakeportal/nomsprenoms/noms.txt", encoding="utf8")
]
PRENOMS = [
x.strip()
for x in open(
"/opt/scodoc/tools/fakeportal/nomsprenoms/prenoms.txt", encoding="utf8"
)
]
# --- Champs à anonymiser (cette configuration pourrait être placé dans
# un fichier séparé et le code serait alors générique pour toute base
# postgresql.
#
# On essaie de retirer les données personnelles des étudiants et des entreprises
#
#
ANONYMIZED_FIELDS = {
"identite.nom": anonymize_name,
"identite.prenom": anonymize_name,
"identite.nom_usuel": anonymize_null,
"identite.civilite_etat_civil": anonymize_null,
"identite.prenom_etat_civil": anonymize_null,
"identite.date_naissance": anonymize_date,
"identite.lieu_naissance": anonymize_question_str,
"identite.dept_naissance": anonymize_question_str,
"identite.nationalite": anonymize_question_str,
"identite.statut": anonymize_null,
"identite.boursier": anonymize_false,
"identite.photo_filename": anonymize_null,
"identite.code_nip": anonymize_null,
"identite.code_ine": anonymize_null,
"identite.scodoc7_id": anonymize_null,
"adresse.email": "'ano@nyme.fr'",
"adresse.emailperso": anonymize_null,
"adresse.domicile": anonymize_null,
"adresse.codepostaldomicile": anonymize_null,
"adresse.villedomicile": anonymize_null,
"adresse.paysdomicile": anonymize_null,
"adresse.telephone": anonymize_null,
"adresse.telephonemobile": anonymize_null,
"adresse.fax": anonymize_null,
"admissions.nomlycee": anonymize_name,
"billet_absence.description": anonymize_null,
"etud_annotations.comment": anonymize_name,
"notes_appreciations.comment": anonymize_name,
}
def anonymize_column(cursor, tablecolumn):
"""Anonymise une colonne
tablecolumn est de la forme nom_de_table.nom_de_colonne, par exemple "identite.nom"
key_name est le nom de la colonne (clé) à utiliser pour certains remplacements
(cette clé doit être anonyme et unique). Par exemple, un nom propre pourrait être
remplacé par nom_valeur_de_la_clé.
"""
table, column = tablecolumn.split(".")
anonymized = ANONYMIZED_FIELDS[tablecolumn]
log(f"processing {tablecolumn}")
cursor.execute(f"UPDATE {table} SET {column} = {anonymized};")
def rename_students(cursor):
"""Remet des noms/prenoms fictifs aux étuduiants"""
# Change les noms/prenoms
cursor.execute("""SELECT * FROM "identite";""")
etuds = cursor.fetchall()
for etud in etuds:
nom, prenom = random.choice(NOMS), random.choice(PRENOMS)
cursor.execute(
"""UPDATE "identite"
SET nom=%(nom)s, prenom=%(prenom)s
WHERE id=%(id)s
""",
{
"id": etud["id"],
"nom": nom,
"prenom": prenom,
},
)
def anonymize_users(cursor):
"""Anonymise la table utilisateurs"""
log("processing user table")
cursor.execute("""UPDATE "user" SET email = 'x@y.fr';""")
cursor.execute("""UPDATE "user" SET password_hash = '*';""")
cursor.execute("""UPDATE "user" SET password_scodoc7 = NULL;""")
cursor.execute("""UPDATE "user" SET date_created = '2001-01-01';""")
cursor.execute("""UPDATE "user" SET date_expiration = '2201-12-31';""")
cursor.execute("""UPDATE "user" SET token = NULL;""")
cursor.execute("""UPDATE "user" SET token_expiration = NULL;""")
# Change les noms/prenoms/mail
cursor.execute("""SELECT * FROM "user" WHERE user_name <> 'admin';""")
users = cursor.fetchall() # fetch tout car modifie cette table ds la boucle
nb_users = len(users)
used_user_names = {u["user_name"] for u in users}
for i, user in enumerate(users):
user_name = user["user_name"]
nom, prenom = random.choice(NOMS), random.choice(PRENOMS)
new_name = (prenom[0] + nom).lower()
# unique ?
while new_name in used_user_names:
new_name += "x"
used_user_names.add(new_name)
print(f"{i}/{nb_users}\t{user_name} > {new_name}")
cursor.execute(
"""UPDATE "user"
SET nom=%(nom)s, prenom=%(prenom)s, email=%(email)s, user_name=%(new_name)s
WHERE id=%(id)s
""",
{
"email": f"{prenom}.{nom}@ano.nyme",
"id": user["id"],
"nom": nom,
"prenom": prenom,
"new_name": new_name,
},
)
# Change les username: utilisés en référence externe
# dans diverses tables:
for table, field in (
("etud_annotations", "author"),
("scolog", "authenticated_user"),
("scolar_news", "authenticated_user"),
("notes_appreciations", "author"),
("are_historique", "authenticated_user"),
):
cursor.execute(
f"""UPDATE "{table}"
SET {field}=%(new_name)s
WHERE {field}=%(user_name)s
""",
{
"new_name": new_name,
"user_name": user_name,
},
)
def anonymize_db(cursor):
"""Traite, une à une, les colonnes indiquées dans ANONYMIZED_FIELDS"""
for tablecolumn in ANONYMIZED_FIELDS:
anonymize_column(cursor, tablecolumn)
if __name__ == "__main__":
PROCESS_USERS = False
if len(sys.argv) < 2 or len(sys.argv) > 3:
usage()
if len(sys.argv) > 2:
if sys.argv[1] != "--users":
usage()
dbname = sys.argv[2]
PROCESS_USERS = True
else:
dbname = sys.argv[1]
log(f"\nAnonymizing database {dbname}")
cnx_string = "dbname=" + dbname
try:
cnx = psycopg2.connect(cnx_string)
except Exception as e:
log(f"\n*** Error: can't connect to database {dbname} ***\n")
log(f"""connexion string was "{cnx_string}" """)
traceback.print_exc()
cnx.set_session(autocommit=False)
cursor = cnx.cursor(cursor_factory=psycopg2.extras.DictCursor)
anonymize_db(cursor)
rename_students(cursor)
if PROCESS_USERS:
anonymize_users(cursor)
cnx.commit()
cnx.close()