809 lines
28 KiB
Python
Executable File
809 lines
28 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
imapsync_daemon.py
|
|
──────────────────
|
|
Daemon d'orchestration de migrations IMAP depuis Grist.
|
|
|
|
Fonctionnement :
|
|
1. Renouvellement du token OAuth2 (au démarrage et à chaque cycle)
|
|
2. Polling de la table BALS dans Grist (colonne Synchronisation = True)
|
|
3. Partition nouvelles BAL / re-passes selon NEW_RATIO
|
|
4. Lancement de N workers imapsync en parallèle
|
|
5. Parse de la sortie imapsync (nb emails, statut, erreurs)
|
|
6. Écriture des résultats dans la table HISTORIQUE de Grist
|
|
|
|
Modes de lancement :
|
|
(aucun argument) Mode daemon normal
|
|
--dry Ajoute --dry à imapsync (simulation, aucune écriture IMAP)
|
|
--test Vérifie toutes les variables d'environnement, la connectivité
|
|
Grist et la présence d'imapsync, puis quitte (0=OK, 1=erreur)
|
|
|
|
Variables d'environnement requises (fichier .env ou export shell) :
|
|
GRIST_API_KEY Clé API Grist
|
|
GRIST_DOC_ID ID du document Grist
|
|
GRIST_BASE_URL URL de base Grist (ex: https://grist.example.fr)
|
|
HOST1 Serveur IMAP source
|
|
HOST2 Serveur IMAP destination
|
|
USER1_PREFIX Compte admin source (authuser1 / identité OAuth2)
|
|
USER2_PREFIX Compte admin destination (authuser2)
|
|
OAUTH2_TOKEN_FILE Chemin vers le fichier de token OAuth2 source
|
|
PASS2 Mot de passe admin destination
|
|
IMAPSYNC Chemin vers l'exécutable imapsync (défaut: /usr/bin/imapsync)
|
|
OAUTH2IMAP Chemin vers l'exécutable oauth2imap (défaut: /usr/bin/oauth2imap)
|
|
TMPDIR_CACHE Répertoire cache imapsync (défaut: /tmp/imapsync_cache)
|
|
N_WORKERS Nombre de synchros parallèles (défaut: 4)
|
|
POLL_INTERVAL Intervalle de polling en secondes (défaut: 30)
|
|
NEW_RATIO Part des slots réservés aux nouvelles BAL (défaut: 0.20)
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import math
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Arguments CLI
|
|
# ─────────────────────────────────────────────
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Daemon imapsync piloté par Grist",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"--dry",
|
|
action="store_true",
|
|
help="Passe --dry à imapsync : simulation sans écriture sur le serveur destination",
|
|
)
|
|
parser.add_argument(
|
|
"--test",
|
|
action="store_true",
|
|
help="Vérifie la configuration et la connectivité, puis quitte (0=OK, 1=erreur)",
|
|
)
|
|
ARGS = parser.parse_args()
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Configuration depuis l'environnement
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def require_env(name: str) -> str:
|
|
val = os.environ.get(name)
|
|
if not val:
|
|
logging.critical(f"Variable d'environnement manquante : {name}")
|
|
sys.exit(1)
|
|
return val
|
|
|
|
|
|
def opt_env(name: str, default: str) -> str:
|
|
return os.environ.get(name, default)
|
|
|
|
|
|
def load_dotenv(path: str = ".env"):
|
|
"""Chargement .env simple sans dépendance externe."""
|
|
p = Path(path)
|
|
if p.exists():
|
|
for line in p.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, val = line.partition("=")
|
|
os.environ.setdefault(key.strip(), val.strip().strip('"').strip("'"))
|
|
|
|
|
|
load_dotenv()
|
|
|
|
GRIST_API_KEY = require_env("GRIST_API_KEY")
|
|
GRIST_DOC_ID = require_env("GRIST_DOC_ID")
|
|
GRIST_BASE_URL = opt_env("GRIST_BASE_URL", "http://localhost:8484")
|
|
HOST1 = require_env("HOST1")
|
|
HOST2 = require_env("HOST2")
|
|
USER1_PREFIX = require_env("USER1_PREFIX")
|
|
USER2_PREFIX = require_env("USER2_PREFIX")
|
|
OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE")
|
|
PASS2 = require_env("PASS2")
|
|
IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync")
|
|
IMAPSYNC_TIMEOUT = 72000
|
|
NB_RESERVE = 1
|
|
OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap")
|
|
TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache")
|
|
N_WORKERS = int(opt_env("N_WORKERS", "4"))
|
|
POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30"))
|
|
NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
|
|
|
|
# Interval minimum entre 2 vérification d'une meme BAL
|
|
MIN_INTERVAL_MINUTES = 15
|
|
|
|
|
|
# Noms des tables Grist (casse exacte telle qu'affichée dans Grist)
|
|
TABLE_BALS = "BALs"
|
|
TABLE_HISTORIQUE = "Historique"
|
|
|
|
# Colonnes Grist
|
|
COL_EMAIL = "Courriel"
|
|
COL_SYNC = "Synchronisation" # booléen déclencheur
|
|
COL_NB_SYNCS = "Nb_syncs" # compteur calculé par formule Grist
|
|
|
|
TOKEN_VALIDITY = 30 # Minutes
|
|
|
|
|
|
_LAST_RENEWAL_TIME = 0.0
|
|
# ─────────────────────────────────────────────
|
|
# Logging
|
|
# ─────────────────────────────────────────────
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
handlers=[
|
|
logging.StreamHandler(sys.stdout),
|
|
logging.FileHandler("imapsync_daemon.log", encoding="utf-8"),
|
|
],
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Renouvellement du token OAuth2
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def renew_oauth2_token() -> bool:
|
|
"""
|
|
Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX
|
|
Retourne True si le renouvellement a réussi, False sinon.
|
|
Le daemon continue même en cas d'échec (le token existant peut encore
|
|
être valide), mais logue une erreur bien visible.
|
|
"""
|
|
|
|
global _LAST_RENEWAL_TIME
|
|
current_time = time.time()
|
|
|
|
# 1. Vérification du cache : est-ce que le token est encore valide ?
|
|
if current_time - _LAST_RENEWAL_TIME < TOKEN_VALIDITY * 60:
|
|
log.debug("Le token est encore valide. Pas de renouvellement nécessaire.")
|
|
return True
|
|
|
|
cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX]
|
|
log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…")
|
|
try:
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
if result.returncode == 0:
|
|
log.info("Token OAuth2 renouvelé avec succès.")
|
|
_LAST_RENEWAL_TIME = current_time
|
|
return True
|
|
else:
|
|
log.error(
|
|
f"Échec du renouvellement du token (code {result.returncode}) : "
|
|
f"{(result.stderr or result.stdout).strip()[:300]}"
|
|
)
|
|
return False
|
|
except subprocess.TimeoutExpired:
|
|
log.error("Timeout lors du renouvellement du token OAuth2 (>60s).")
|
|
return False
|
|
except FileNotFoundError:
|
|
log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}")
|
|
return False
|
|
except Exception as e:
|
|
log.error(f"Erreur inattendue lors du renouvellement du token : {e}")
|
|
return False
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Mode --test
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def run_tests() -> bool:
|
|
"""
|
|
Vérifie tous les prérequis de la configuration.
|
|
Retourne True si tout est OK, False sinon.
|
|
"""
|
|
ok = True
|
|
sep = "─" * 56
|
|
|
|
def check(label: str, passed: bool, detail: str = ""):
|
|
nonlocal ok
|
|
icon = "✓" if passed else "✗"
|
|
msg = f" {icon} {label}"
|
|
if detail:
|
|
msg += f" → {detail}"
|
|
print(msg)
|
|
if not passed:
|
|
ok = False
|
|
|
|
print()
|
|
print("═" * 56)
|
|
print(" imapsync_daemon — test de configuration")
|
|
print("═" * 56)
|
|
|
|
# ── Variables d'environnement ────────────────
|
|
print(f"\n{sep}")
|
|
print(" Variables d'environnement")
|
|
print(sep)
|
|
env_checks = [
|
|
("GRIST_API_KEY", GRIST_API_KEY, None),
|
|
("GRIST_DOC_ID", GRIST_DOC_ID, None),
|
|
("GRIST_BASE_URL", GRIST_BASE_URL, GRIST_BASE_URL),
|
|
("HOST1", HOST1, HOST1),
|
|
("HOST2", HOST2, HOST2),
|
|
("USER1_PREFIX", USER1_PREFIX, USER1_PREFIX),
|
|
("USER2_PREFIX", USER2_PREFIX, USER2_PREFIX),
|
|
("OAUTH2_TOKEN_FILE", OAUTH2_TOKEN_FILE, OAUTH2_TOKEN_FILE),
|
|
("PASS2", PASS2, "défini"),
|
|
("IMAPSYNC", IMAPSYNC, IMAPSYNC),
|
|
("OAUTH2IMAP", OAUTH2IMAP, OAUTH2IMAP),
|
|
("TMPDIR_CACHE", TMPDIR_CACHE, TMPDIR_CACHE),
|
|
("N_WORKERS", str(N_WORKERS), str(N_WORKERS)),
|
|
("POLL_INTERVAL", str(POLL_INTERVAL), f"{POLL_INTERVAL}s"),
|
|
("NEW_RATIO", str(NEW_RATIO), f"{NEW_RATIO:.0%}"),
|
|
]
|
|
for name, val, shown in env_checks:
|
|
set_ok = bool(val)
|
|
check(name, set_ok, shown if set_ok else "MANQUANTE")
|
|
|
|
# ── Fichiers et répertoires ──────────────────
|
|
print(f"\n{sep}")
|
|
print(" Fichiers et répertoires")
|
|
print(sep)
|
|
|
|
for exe_name, exe_path in [("imapsync", IMAPSYNC), ("oauth2imap", OAUTH2IMAP)]:
|
|
p = Path(exe_path)
|
|
found = p.exists() and os.access(p, os.X_OK)
|
|
check(
|
|
f"{exe_name} exécutable",
|
|
found,
|
|
str(p) + (" (trouvé)" if found else " (introuvable)"),
|
|
)
|
|
|
|
token_p = Path(OAUTH2_TOKEN_FILE)
|
|
check(
|
|
"OAUTH2_TOKEN_FILE",
|
|
token_p.exists() and token_p.is_file(),
|
|
str(token_p) + (" (trouvé)" if token_p.exists() else " (introuvable)"),
|
|
)
|
|
|
|
tmpdir_p = Path(TMPDIR_CACHE)
|
|
if tmpdir_p.exists():
|
|
check("TMPDIR_CACHE", True, f"{tmpdir_p} (existe)")
|
|
else:
|
|
try:
|
|
tmpdir_p.mkdir(parents=True, exist_ok=True)
|
|
check("TMPDIR_CACHE", True, f"{tmpdir_p} (créé)")
|
|
except OSError as e:
|
|
check("TMPDIR_CACHE", False, str(e))
|
|
|
|
# ── Renouvellement token ─────────────────────
|
|
print(f"\n{sep}")
|
|
print(" Renouvellement token OAuth2")
|
|
print(sep)
|
|
token_ok = renew_oauth2_token()
|
|
check(
|
|
"oauth2imap --token_file … renew",
|
|
token_ok,
|
|
"succès" if token_ok else "échec — vérifier oauth2imap et le token",
|
|
)
|
|
|
|
# ── Connectivité Grist ───────────────────────
|
|
print(f"\n{sep}")
|
|
print(" Connectivité Grist")
|
|
print(sep)
|
|
|
|
grist_headers = {
|
|
"Authorization": f"Bearer {GRIST_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
try:
|
|
resp = requests.get(
|
|
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}",
|
|
headers=grist_headers,
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
doc_name = resp.json().get("name", "?")
|
|
check("Accès document Grist", True, f'"{doc_name}"')
|
|
except requests.RequestException as e:
|
|
check("Accès document Grist", False, str(e))
|
|
|
|
for table in (TABLE_BALS, TABLE_HISTORIQUE):
|
|
try:
|
|
resp = requests.get(
|
|
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records",
|
|
headers=grist_headers,
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
nb = len(resp.json().get("records", []))
|
|
|
|
if table == TABLE_BALS and nb > 0:
|
|
sample = resp.json()["records"][0]["fields"]
|
|
has_nb_syncs = COL_NB_SYNCS in sample
|
|
detail = (
|
|
f"{nb} enregistrements"
|
|
if has_nb_syncs
|
|
else f"{nb} enregistrements ⚠ colonne '{COL_NB_SYNCS}' absente — à créer dans Grist"
|
|
)
|
|
check(f"Lecture table {table}", True, detail)
|
|
else:
|
|
check(f"Lecture table {table}", True, f"{nb} enregistrements")
|
|
except requests.RequestException as e:
|
|
check(f"Lecture table {table}", False, str(e))
|
|
|
|
grist_add_historique("mathieu.maura@sdis66.fr", "Test", 0, 10, "Test")
|
|
|
|
# ── Paramètres calculés ──────────────────────
|
|
print(f"\n{sep}")
|
|
print(" Paramètres calculés")
|
|
print(sep)
|
|
slots_new = math.ceil(N_WORKERS * NEW_RATIO)
|
|
slots_rep = N_WORKERS - slots_new
|
|
print(f" • N_WORKERS={N_WORKERS} NEW_RATIO={NEW_RATIO:.0%}")
|
|
print(f" → {slots_new} slot(s) nouvelles BAL")
|
|
print(f" → {slots_rep} slot(s) re-passes")
|
|
if ARGS.dry:
|
|
print(" • Mode --dry actif : imapsync sera lancé avec --dry")
|
|
|
|
# ── Résumé ───────────────────────────────────
|
|
print()
|
|
print("═" * 56)
|
|
if ok:
|
|
print(" Résultat : OK — configuration valide")
|
|
else:
|
|
print(" Résultat : ERREUR — corriger les points marqués ✗")
|
|
print("═" * 56)
|
|
print()
|
|
return ok
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Grist API
|
|
# ─────────────────────────────────────────────
|
|
|
|
GRIST_HEADERS = {
|
|
"Authorization": f"Bearer {GRIST_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def grist_url(table: str) -> str:
|
|
return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records"
|
|
|
|
|
|
def grist_fetch_bals() -> list[dict]:
|
|
"""
|
|
Retourne les BAL où Synchronisation = True.
|
|
Trié par 'Date_derniere_passe' (le plus vieux en premier).
|
|
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
|
|
"""
|
|
try:
|
|
# 1. Récupération triée via l'API Grist
|
|
params = {"sort": "Date_derniere_passe"}
|
|
resp = requests.get(
|
|
grist_url(TABLE_BALS), headers=GRIST_HEADERS, params=params, timeout=15
|
|
)
|
|
resp.raise_for_status()
|
|
records = resp.json().get("records", [])
|
|
|
|
# 2. On récupère le timestamp actuel en secondes
|
|
current_timestamp = time.time()
|
|
filtered_bals = []
|
|
|
|
for r in records:
|
|
fields = r["fields"]
|
|
|
|
# Vérification de la condition de synchronisation globale
|
|
if fields.get(COL_SYNC) is not True:
|
|
continue
|
|
|
|
# Récupération du timestamp Grist (ex: 1779124323.67)
|
|
last_pass_timestamp = fields.get("Date_derniere_passe")
|
|
|
|
# Si le timestamp existe, on valide l'écart de temps
|
|
if last_pass_timestamp is not None:
|
|
try:
|
|
# Calcul du temps écoulé depuis la dernière passe
|
|
elapsed_time = current_timestamp - float(last_pass_timestamp)
|
|
|
|
# Si l'écart est inférieur à 15 minutes (900 secondes), on l'exclut
|
|
if elapsed_time < MIN_INTERVAL_MINUTES * 60:
|
|
log.debug(
|
|
f"BAL {r['id']} ignorée : passée il y a seulement {int(elapsed_time // 60)} min."
|
|
)
|
|
continue
|
|
except (ValueError, TypeError):
|
|
log.warning(
|
|
f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. "
|
|
f"Incluse par sécurité."
|
|
)
|
|
|
|
# Si pas de date (jamais passée) ou si assez ancienne, on l'ajoute
|
|
filtered_bals.append({"id": r["id"], **fields})
|
|
|
|
return filtered_bals
|
|
|
|
except requests.RequestException as e:
|
|
log.error(f"Erreur lecture Grist BALS : {e}")
|
|
return []
|
|
|
|
|
|
def grist_priority_bals() -> dict:
|
|
"""
|
|
Retourne les BAL où Synchronisation = True.
|
|
Trié par 'Date_derniere_passe' (le plus vieux en premier).
|
|
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
|
|
"""
|
|
try:
|
|
# 1. Récupération triée via l'API Grist
|
|
params = {"limit": 1, "filter": {"priority": [True], COL_SYNC: [True]}}
|
|
resp = requests.get(
|
|
grist_url(TABLE_BALS), headers=GRIST_HEADERS, params=params, timeout=15
|
|
)
|
|
resp.raise_for_status()
|
|
records = resp.json().get("records", [])
|
|
|
|
# for r in records:
|
|
r = records[0]
|
|
fields = r["fields"]
|
|
|
|
# Vérification de la condition de synchronisation globale
|
|
if fields.get(COL_SYNC) is not True:
|
|
return {}
|
|
|
|
# Si le timestamp existe, on valide l'écart de temps
|
|
if fields.get("Date_derniere_passe") is not None:
|
|
return {}
|
|
|
|
return r
|
|
|
|
except requests.RequestException as e:
|
|
log.error(f"Erreur lecture Grist BALS : {e}")
|
|
return {}
|
|
|
|
|
|
def grist_add_historique(
|
|
email: str,
|
|
status: str,
|
|
nb_emails: int,
|
|
duree_sec: int,
|
|
log_text: str,
|
|
dry: bool = False,
|
|
):
|
|
"""Ajoute un enregistrement dans la table HISTORIQUE."""
|
|
final_status = ("dry:" + status) if dry else status
|
|
payload = {
|
|
"records": [
|
|
{
|
|
"fields": {
|
|
"Bal": email,
|
|
"Status": final_status,
|
|
"Nb_emails": nb_emails,
|
|
"Duree_sec": duree_sec,
|
|
"Log": log_text[:4000],
|
|
"Date": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
}
|
|
]
|
|
}
|
|
# print(f" → payload: {payload}")
|
|
try:
|
|
resp = requests.post(
|
|
grist_url(TABLE_HISTORIQUE), headers=GRIST_HEADERS, json=payload, timeout=15
|
|
)
|
|
resp.raise_for_status()
|
|
log.info(f"[{email}] Résultat écrit dans HISTORIQUE ({final_status})")
|
|
except requests.RequestException as e:
|
|
log.error(f"[{email}] Erreur écriture Grist HISTORIQUE : {e} \n {resp.text}")
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Partition nouvelles BAL / re-passes
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def partition_bals(rows: list[dict], available: int) -> list[dict]:
|
|
"""
|
|
Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes.
|
|
|
|
- NB_RESERVE slots sont strictement exclusifs aux re-passes.
|
|
- Le reste des slots est réparti selon le NEW_RATIO.
|
|
- Si une file est vide, ses slots standards basculent sur l'autre.
|
|
- La réserve des repasses ne bascule JAMAIS vers les nouvelles.
|
|
"""
|
|
# On borne les slots disponibles par la capacité maximale de notre système
|
|
slots_dispos = min(available, N_WORKERS)
|
|
if slots_dispos <= 0:
|
|
return []
|
|
|
|
# 1. Séparation des deux files
|
|
nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)]
|
|
repasses = [r for r in rows if r.get(COL_NB_SYNCS)]
|
|
|
|
# 2. Isolation de la réserve exclusive
|
|
# Si on a moins de slots dispos que la réserve demandée, la réserve prend tout
|
|
slots_reserve_exclusive = min(NB_RESERVE, slots_dispos)
|
|
slots_partageables = slots_dispos - slots_reserve_exclusive
|
|
|
|
# 3. Répartition théorique des slots partageables selon le NEW_RATIO
|
|
quota_new = math.ceil(slots_partageables * NEW_RATIO)
|
|
quota_rep_standard = slots_partageables - quota_new
|
|
|
|
# Le quota total théorique des repasses inclut sa réserve exclusive
|
|
quota_rep_total = quota_rep_standard + slots_reserve_exclusive
|
|
|
|
# 4. Premier round de sélection (dans la limite des éléments disponibles)
|
|
picked_new = nouvelles[:quota_new]
|
|
picked_rep = repasses[:quota_rep_total]
|
|
|
|
# 5. Gestion des surplus (Bascule asymétrique)
|
|
surplus_new = quota_new - len(picked_new)
|
|
# Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve
|
|
surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep))
|
|
|
|
if surplus_new > 0:
|
|
# Les nouvelles n'en veulent plus -> tout le rab va aux repasses
|
|
picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new]
|
|
|
|
if surplus_rep_standard > 0:
|
|
# Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles
|
|
picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard]
|
|
|
|
# 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive
|
|
assert len(picked_new) <= slots_partageables
|
|
assert len(picked_new) + len(picked_rep) <= slots_dispos
|
|
|
|
# 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un)
|
|
result = []
|
|
i, j = 0, 0
|
|
while i < len(picked_new) or j < len(picked_rep):
|
|
if i < len(picked_new):
|
|
result.append(picked_new[i])
|
|
i += 1
|
|
if j < len(picked_rep):
|
|
result.append(picked_rep[j])
|
|
j += 1
|
|
|
|
return result
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Parse de la sortie imapsync
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def parse_imapsync_output(output: str) -> dict:
|
|
nb_emails = 0
|
|
status = "success"
|
|
error_msg = ""
|
|
|
|
m = re.search(r"Messages transferred :\s+(\d+)$", output, re.IGNORECASE)
|
|
if m:
|
|
nb_emails = int(m.group(1))
|
|
|
|
if re.search(r"Exiting with return value\s+0", output):
|
|
status = "success"
|
|
elif re.search(r"Exiting with return value\s+[^0]", output):
|
|
status = "error"
|
|
|
|
err_match = re.search(
|
|
r"^(Error|Err|FATAL):?\s+(.+)$", output, re.MULTILINE | re.IGNORECASE
|
|
)
|
|
if err_match:
|
|
error_msg = err_match.group(2).strip()
|
|
|
|
return {"nb_emails": nb_emails, "status": status, "error_msg": error_msg}
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Commande imapsync
|
|
# ─────────────────────────────────────────────
|
|
|
|
|
|
def build_imapsync_cmd(email: str, dry: bool = False) -> list[str]:
|
|
cmd = [
|
|
IMAPSYNC,
|
|
"--host1",
|
|
HOST1,
|
|
"--user1",
|
|
email,
|
|
"--authuser1",
|
|
USER1_PREFIX,
|
|
"--oauthaccesstoken1",
|
|
OAUTH2_TOKEN_FILE,
|
|
"--host2",
|
|
HOST2,
|
|
"--user2",
|
|
email,
|
|
"--authuser2",
|
|
USER2_PREFIX,
|
|
"--password2",
|
|
PASS2,
|
|
"--nofoldersizes",
|
|
"--noreleasecheck",
|
|
"--office1",
|
|
"--automap",
|
|
"--delete2duplicates",
|
|
"--addheader",
|
|
"--tmpdir",
|
|
TMPDIR_CACHE,
|
|
"--usecache",
|
|
"--exclude",
|
|
r"^Calendrier(/|$)",
|
|
"--exclude",
|
|
r"^Contacts(/|$)",
|
|
"--exclude",
|
|
r"^Boîte d'envoi(/|$)",
|
|
"--exclude",
|
|
r"^Flux RSS(/|$)",
|
|
"--exclude",
|
|
r"^Historique des conversations(/|$)",
|
|
"--exclude",
|
|
r"^Journal(/|$)",
|
|
"--exclude",
|
|
r"^Notes(/|$)",
|
|
"--exclude",
|
|
r"^Probl&AOg-mes de synchronisation(/|$)",
|
|
"--exclude",
|
|
r"^R&AOk-p&AOk-t&AOk-(/|$)",
|
|
"--exclude",
|
|
r"^T&AOI-ches(/|$)",
|
|
"--exclude",
|
|
r"^Contacts sugg&AOk-r&AOk-s(/|$)",
|
|
"--f1f2",
|
|
"Courrier indésirable=Pourriel",
|
|
]
|
|
for year in range(2000, 2027):
|
|
cmd += ["--f1f2", f"Archive/{year}=Archives.{year}"]
|
|
cmd += ["--f1f2", "Archive=Archives"]
|
|
|
|
if dry:
|
|
cmd.append("--dry")
|
|
|
|
return cmd
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Worker : exécute un job imapsync
|
|
# ─────────────────────────────────────────────
|
|
|
|
_running: set[str] = set()
|
|
|
|
|
|
def run_sync_job(row: dict, dry: bool = False) -> None:
|
|
email = row.get(COL_EMAIL, "").strip()
|
|
if not email:
|
|
log.warning(f"Ligne {row.get('id')} sans courriel, ignorée.")
|
|
return
|
|
if email in _running:
|
|
log.info(f"[{email}] Déjà en cours, skip.")
|
|
return
|
|
|
|
_running.add(email)
|
|
dry_tag = " [DRY]" if dry else ""
|
|
log.info(f"[{email}] Démarrage{dry_tag}")
|
|
|
|
cmd = build_imapsync_cmd(email, dry=dry)
|
|
start = time.monotonic()
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=IMAPSYNC_TIMEOUT
|
|
)
|
|
output = result.stdout + "\n" + result.stderr
|
|
elapsed = int(time.monotonic() - start)
|
|
parsed = parse_imapsync_output(output)
|
|
|
|
if result.returncode != 0 and parsed["status"] == "success":
|
|
parsed["status"] = "error"
|
|
parsed["error_msg"] = f"Exit code {result.returncode}"
|
|
|
|
log_text = output if parsed["status"] == "error" else result.stdout[-3000:]
|
|
log.info(
|
|
f"[{email}] Terminé{dry_tag} — statut={parsed['status']} "
|
|
f"emails={parsed['nb_emails']} durée={elapsed}s"
|
|
)
|
|
grist_add_historique(
|
|
email, parsed["status"], parsed["nb_emails"], elapsed, log_text, dry=dry
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
elapsed = int(time.monotonic() - start)
|
|
log.error(f"[{email}] Timeout après {elapsed}s")
|
|
grist_add_historique(
|
|
email, "timeout", 0, elapsed, f"Timeout {elapsed}s dépassé", dry=dry
|
|
)
|
|
|
|
except Exception as e:
|
|
elapsed = int(time.monotonic() - start)
|
|
log.error(f"[{email}] Exception inattendue : {e}")
|
|
grist_add_historique(email, "error", 0, elapsed, str(e), dry=dry)
|
|
|
|
finally:
|
|
_running.discard(email)
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
# Boucle principale
|
|
# ─────────────────────────────────────────────
|
|
|
|
_shutdown = False
|
|
|
|
|
|
def handle_signal(signum, frame):
|
|
global _shutdown
|
|
log.info(f"Signal {signum} reçu — arrêt propre en cours…")
|
|
_shutdown = True
|
|
|
|
|
|
signal.signal(signal.SIGTERM, handle_signal)
|
|
signal.signal(signal.SIGINT, handle_signal)
|
|
|
|
|
|
def main():
|
|
dry = ARGS.dry
|
|
log.info("═" * 60)
|
|
log.info(
|
|
f"imapsync daemon démarré — workers={N_WORKERS} "
|
|
f"poll={POLL_INTERVAL}s new_ratio={NEW_RATIO:.0%}"
|
|
+ (" [MODE DRY]" if dry else "")
|
|
)
|
|
log.info(f"Source : {HOST1} → Destination : {HOST2}")
|
|
log.info("═" * 60)
|
|
|
|
Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True)
|
|
|
|
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
|
|
futures: dict = {}
|
|
|
|
while not _shutdown:
|
|
# Nettoyage des futures terminées
|
|
done = [f for f in list(futures) if f.done()]
|
|
for f in done:
|
|
email = futures.pop(f)
|
|
try:
|
|
f.result()
|
|
except Exception as e:
|
|
log.error(f"[{email}] Erreur non gérée : {e}")
|
|
|
|
# Renouvellement du token à chaque cycle de polling
|
|
renew_oauth2_token()
|
|
|
|
# Remplissage des slots disponibles
|
|
available = N_WORKERS - len(futures)
|
|
if available > 0:
|
|
rows = grist_fetch_bals()
|
|
rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running]
|
|
for row in partition_bals(rows, available):
|
|
email = row.get(COL_EMAIL, "").strip()
|
|
f = pool.submit(run_sync_job, row, dry)
|
|
futures[f] = email
|
|
log.info(f"[{email}] Job soumis au pool")
|
|
|
|
# Ajout de la BAL prioritaire
|
|
row = grist_priority_bals()
|
|
email = row.get(COL_EMAIL, "").strip()
|
|
if email not in _running and email != "":
|
|
f = pool.submit(run_sync_job, row, dry)
|
|
futures[f] = email
|
|
log.info(f"[{email}] Job soumis au pool")
|
|
|
|
if not _shutdown:
|
|
time.sleep(POLL_INTERVAL)
|
|
|
|
log.info("Daemon arrêté proprement.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if ARGS.test:
|
|
sys.exit(0 if run_tests() else 1)
|
|
main()
|