init
This commit is contained in:
Executable
+766
@@ -0,0 +1,766 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
imapsync_daemon.py
|
||||
──────────────────
|
||||
Daemon d'orchestration de migrations IMAP depuis Grist.
|
||||
|
||||
Fonctionnement :
|
||||
1. Renouvellement du token OAuth2 (au démarrage et à chaque cycle)
|
||||
2. Polling de la table BALS dans Grist (colonne Synchronisation = True)
|
||||
3. Partition nouvelles BAL / re-passes selon NEW_RATIO
|
||||
4. Lancement de N workers imapsync en parallèle
|
||||
5. Parse de la sortie imapsync (nb emails, statut, erreurs)
|
||||
6. Écriture des résultats dans la table HISTORIQUE de Grist
|
||||
|
||||
Modes de lancement :
|
||||
(aucun argument) Mode daemon normal
|
||||
--dry Ajoute --dry à imapsync (simulation, aucune écriture IMAP)
|
||||
--test Vérifie toutes les variables d'environnement, la connectivité
|
||||
Grist et la présence d'imapsync, puis quitte (0=OK, 1=erreur)
|
||||
|
||||
Variables d'environnement requises (fichier .env ou export shell) :
|
||||
GRIST_API_KEY Clé API Grist
|
||||
GRIST_DOC_ID ID du document Grist
|
||||
GRIST_BASE_URL URL de base Grist (ex: https://grist.example.fr)
|
||||
HOST1 Serveur IMAP source
|
||||
HOST2 Serveur IMAP destination
|
||||
USER1_PREFIX Compte admin source (authuser1 / identité OAuth2)
|
||||
USER2_PREFIX Compte admin destination (authuser2)
|
||||
OAUTH2_TOKEN_FILE Chemin vers le fichier de token OAuth2 source
|
||||
PASS2 Mot de passe admin destination
|
||||
IMAPSYNC Chemin vers l'exécutable imapsync (défaut: /usr/bin/imapsync)
|
||||
OAUTH2IMAP Chemin vers l'exécutable oauth2imap (défaut: /usr/bin/oauth2imap)
|
||||
TMPDIR_CACHE Répertoire cache imapsync (défaut: /tmp/imapsync_cache)
|
||||
N_WORKERS Nombre de synchros parallèles (défaut: 4)
|
||||
POLL_INTERVAL Intervalle de polling en secondes (défaut: 30)
|
||||
NEW_RATIO Part des slots réservés aux nouvelles BAL (défaut: 0.20)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Arguments CLI
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Daemon imapsync piloté par Grist",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry",
|
||||
action="store_true",
|
||||
help="Passe --dry à imapsync : simulation sans écriture sur le serveur destination",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--test",
|
||||
action="store_true",
|
||||
help="Vérifie la configuration et la connectivité, puis quitte (0=OK, 1=erreur)",
|
||||
)
|
||||
ARGS = parser.parse_args()
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Configuration depuis l'environnement
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def require_env(name: str) -> str:
|
||||
val = os.environ.get(name)
|
||||
if not val:
|
||||
logging.critical(f"Variable d'environnement manquante : {name}")
|
||||
sys.exit(1)
|
||||
return val
|
||||
|
||||
|
||||
def opt_env(name: str, default: str) -> str:
|
||||
return os.environ.get(name, default)
|
||||
|
||||
|
||||
def load_dotenv(path: str = ".env"):
|
||||
"""Chargement .env simple sans dépendance externe."""
|
||||
p = Path(path)
|
||||
if p.exists():
|
||||
for line in p.read_text().splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
key, _, val = line.partition("=")
|
||||
os.environ.setdefault(key.strip(), val.strip().strip('"').strip("'"))
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
GRIST_API_KEY = require_env("GRIST_API_KEY")
|
||||
GRIST_DOC_ID = require_env("GRIST_DOC_ID")
|
||||
GRIST_BASE_URL = opt_env("GRIST_BASE_URL", "http://localhost:8484")
|
||||
HOST1 = require_env("HOST1")
|
||||
HOST2 = require_env("HOST2")
|
||||
USER1_PREFIX = require_env("USER1_PREFIX")
|
||||
USER2_PREFIX = require_env("USER2_PREFIX")
|
||||
OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE")
|
||||
PASS2 = require_env("PASS2")
|
||||
IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync")
|
||||
IMAPSYNC_TIMEOUT = 72000
|
||||
NB_RESERVE = 1
|
||||
OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap")
|
||||
TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache")
|
||||
N_WORKERS = int(opt_env("N_WORKERS", "4"))
|
||||
POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30"))
|
||||
NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
|
||||
|
||||
# Interval minimum entre 2 vérification d'une meme BAL
|
||||
MIN_INTERVAL_MINUTES = 15
|
||||
|
||||
|
||||
# Noms des tables Grist (casse exacte telle qu'affichée dans Grist)
|
||||
TABLE_BALS = "BALs"
|
||||
TABLE_HISTORIQUE = "Historique"
|
||||
|
||||
# Colonnes Grist
|
||||
COL_EMAIL = "Courriel"
|
||||
COL_SYNC = "Synchronisation" # booléen déclencheur
|
||||
COL_NB_SYNCS = "Nb_syncs" # compteur calculé par formule Grist
|
||||
|
||||
TOKEN_VALIDITY = 30 # Minutes
|
||||
|
||||
|
||||
_LAST_RENEWAL_TIME = 0.0
|
||||
# ─────────────────────────────────────────────
|
||||
# Logging
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler("imapsync_daemon.log", encoding="utf-8"),
|
||||
],
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Renouvellement du token OAuth2
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def renew_oauth2_token() -> bool:
|
||||
"""
|
||||
Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX
|
||||
Retourne True si le renouvellement a réussi, False sinon.
|
||||
Le daemon continue même en cas d'échec (le token existant peut encore
|
||||
être valide), mais logue une erreur bien visible.
|
||||
"""
|
||||
|
||||
global _LAST_RENEWAL_TIME
|
||||
current_time = time.time()
|
||||
|
||||
# 1. Vérification du cache : est-ce que le token est encore valide ?
|
||||
if current_time - _LAST_RENEWAL_TIME < TOKEN_VALIDITY * 60:
|
||||
log.debug("Le token est encore valide. Pas de renouvellement nécessaire.")
|
||||
return True
|
||||
|
||||
cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX]
|
||||
log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…")
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
if result.returncode == 0:
|
||||
log.info("Token OAuth2 renouvelé avec succès.")
|
||||
_LAST_RENEWAL_TIME = current_time
|
||||
return True
|
||||
else:
|
||||
log.error(
|
||||
f"Échec du renouvellement du token (code {result.returncode}) : "
|
||||
f"{(result.stderr or result.stdout).strip()[:300]}"
|
||||
)
|
||||
return False
|
||||
except subprocess.TimeoutExpired:
|
||||
log.error("Timeout lors du renouvellement du token OAuth2 (>60s).")
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}")
|
||||
return False
|
||||
except Exception as e:
|
||||
log.error(f"Erreur inattendue lors du renouvellement du token : {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Mode --test
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def run_tests() -> bool:
|
||||
"""
|
||||
Vérifie tous les prérequis de la configuration.
|
||||
Retourne True si tout est OK, False sinon.
|
||||
"""
|
||||
ok = True
|
||||
sep = "─" * 56
|
||||
|
||||
def check(label: str, passed: bool, detail: str = ""):
|
||||
nonlocal ok
|
||||
icon = "✓" if passed else "✗"
|
||||
msg = f" {icon} {label}"
|
||||
if detail:
|
||||
msg += f" → {detail}"
|
||||
print(msg)
|
||||
if not passed:
|
||||
ok = False
|
||||
|
||||
print()
|
||||
print("═" * 56)
|
||||
print(" imapsync_daemon — test de configuration")
|
||||
print("═" * 56)
|
||||
|
||||
# ── Variables d'environnement ────────────────
|
||||
print(f"\n{sep}")
|
||||
print(" Variables d'environnement")
|
||||
print(sep)
|
||||
env_checks = [
|
||||
("GRIST_API_KEY", GRIST_API_KEY, None),
|
||||
("GRIST_DOC_ID", GRIST_DOC_ID, None),
|
||||
("GRIST_BASE_URL", GRIST_BASE_URL, GRIST_BASE_URL),
|
||||
("HOST1", HOST1, HOST1),
|
||||
("HOST2", HOST2, HOST2),
|
||||
("USER1_PREFIX", USER1_PREFIX, USER1_PREFIX),
|
||||
("USER2_PREFIX", USER2_PREFIX, USER2_PREFIX),
|
||||
("OAUTH2_TOKEN_FILE", OAUTH2_TOKEN_FILE, OAUTH2_TOKEN_FILE),
|
||||
("PASS2", PASS2, "défini"),
|
||||
("IMAPSYNC", IMAPSYNC, IMAPSYNC),
|
||||
("OAUTH2IMAP", OAUTH2IMAP, OAUTH2IMAP),
|
||||
("TMPDIR_CACHE", TMPDIR_CACHE, TMPDIR_CACHE),
|
||||
("N_WORKERS", str(N_WORKERS), str(N_WORKERS)),
|
||||
("POLL_INTERVAL", str(POLL_INTERVAL), f"{POLL_INTERVAL}s"),
|
||||
("NEW_RATIO", str(NEW_RATIO), f"{NEW_RATIO:.0%}"),
|
||||
]
|
||||
for name, val, shown in env_checks:
|
||||
set_ok = bool(val)
|
||||
check(name, set_ok, shown if set_ok else "MANQUANTE")
|
||||
|
||||
# ── Fichiers et répertoires ──────────────────
|
||||
print(f"\n{sep}")
|
||||
print(" Fichiers et répertoires")
|
||||
print(sep)
|
||||
|
||||
for exe_name, exe_path in [("imapsync", IMAPSYNC), ("oauth2imap", OAUTH2IMAP)]:
|
||||
p = Path(exe_path)
|
||||
found = p.exists() and os.access(p, os.X_OK)
|
||||
check(
|
||||
f"{exe_name} exécutable",
|
||||
found,
|
||||
str(p) + (" (trouvé)" if found else " (introuvable)"),
|
||||
)
|
||||
|
||||
token_p = Path(OAUTH2_TOKEN_FILE)
|
||||
check(
|
||||
"OAUTH2_TOKEN_FILE",
|
||||
token_p.exists() and token_p.is_file(),
|
||||
str(token_p) + (" (trouvé)" if token_p.exists() else " (introuvable)"),
|
||||
)
|
||||
|
||||
tmpdir_p = Path(TMPDIR_CACHE)
|
||||
if tmpdir_p.exists():
|
||||
check("TMPDIR_CACHE", True, f"{tmpdir_p} (existe)")
|
||||
else:
|
||||
try:
|
||||
tmpdir_p.mkdir(parents=True, exist_ok=True)
|
||||
check("TMPDIR_CACHE", True, f"{tmpdir_p} (créé)")
|
||||
except OSError as e:
|
||||
check("TMPDIR_CACHE", False, str(e))
|
||||
|
||||
# ── Renouvellement token ─────────────────────
|
||||
print(f"\n{sep}")
|
||||
print(" Renouvellement token OAuth2")
|
||||
print(sep)
|
||||
token_ok = renew_oauth2_token()
|
||||
check(
|
||||
"oauth2imap --token_file … renew",
|
||||
token_ok,
|
||||
"succès" if token_ok else "échec — vérifier oauth2imap et le token",
|
||||
)
|
||||
|
||||
# ── Connectivité Grist ───────────────────────
|
||||
print(f"\n{sep}")
|
||||
print(" Connectivité Grist")
|
||||
print(sep)
|
||||
|
||||
grist_headers = {
|
||||
"Authorization": f"Bearer {GRIST_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}",
|
||||
headers=grist_headers,
|
||||
timeout=10,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
doc_name = resp.json().get("name", "?")
|
||||
check("Accès document Grist", True, f'"{doc_name}"')
|
||||
except requests.RequestException as e:
|
||||
check("Accès document Grist", False, str(e))
|
||||
|
||||
for table in (TABLE_BALS, TABLE_HISTORIQUE):
|
||||
try:
|
||||
resp = requests.get(
|
||||
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records",
|
||||
headers=grist_headers,
|
||||
timeout=10,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
nb = len(resp.json().get("records", []))
|
||||
|
||||
if table == TABLE_BALS and nb > 0:
|
||||
sample = resp.json()["records"][0]["fields"]
|
||||
has_nb_syncs = COL_NB_SYNCS in sample
|
||||
detail = (
|
||||
f"{nb} enregistrements"
|
||||
if has_nb_syncs
|
||||
else f"{nb} enregistrements ⚠ colonne '{COL_NB_SYNCS}' absente — à créer dans Grist"
|
||||
)
|
||||
check(f"Lecture table {table}", True, detail)
|
||||
else:
|
||||
check(f"Lecture table {table}", True, f"{nb} enregistrements")
|
||||
except requests.RequestException as e:
|
||||
check(f"Lecture table {table}", False, str(e))
|
||||
|
||||
grist_add_historique("mathieu.maura@sdis66.fr", "Test", 0, 10, "Test")
|
||||
|
||||
# ── Paramètres calculés ──────────────────────
|
||||
print(f"\n{sep}")
|
||||
print(" Paramètres calculés")
|
||||
print(sep)
|
||||
slots_new = math.ceil(N_WORKERS * NEW_RATIO)
|
||||
slots_rep = N_WORKERS - slots_new
|
||||
print(f" • N_WORKERS={N_WORKERS} NEW_RATIO={NEW_RATIO:.0%}")
|
||||
print(f" → {slots_new} slot(s) nouvelles BAL")
|
||||
print(f" → {slots_rep} slot(s) re-passes")
|
||||
if ARGS.dry:
|
||||
print(" • Mode --dry actif : imapsync sera lancé avec --dry")
|
||||
|
||||
# ── Résumé ───────────────────────────────────
|
||||
print()
|
||||
print("═" * 56)
|
||||
if ok:
|
||||
print(" Résultat : OK — configuration valide")
|
||||
else:
|
||||
print(" Résultat : ERREUR — corriger les points marqués ✗")
|
||||
print("═" * 56)
|
||||
print()
|
||||
return ok
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Grist API
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
GRIST_HEADERS = {
|
||||
"Authorization": f"Bearer {GRIST_API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def grist_url(table: str) -> str:
|
||||
return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records"
|
||||
|
||||
|
||||
def grist_fetch_bals() -> list[dict]:
|
||||
"""
|
||||
Retourne les BAL où Synchronisation = True.
|
||||
Trié par 'Date_derniere_passe' (le plus vieux en premier).
|
||||
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
|
||||
"""
|
||||
try:
|
||||
# 1. Récupération triée via l'API Grist
|
||||
params = {"sort": "Date_derniere_passe"}
|
||||
resp = requests.get(
|
||||
grist_url(TABLE_BALS), headers=GRIST_HEADERS, params=params, timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
records = resp.json().get("records", [])
|
||||
|
||||
# 2. On récupère le timestamp actuel en secondes
|
||||
current_timestamp = time.time()
|
||||
filtered_bals = []
|
||||
|
||||
for r in records:
|
||||
fields = r["fields"]
|
||||
|
||||
# Vérification de la condition de synchronisation globale
|
||||
if fields.get(COL_SYNC) is not True:
|
||||
continue
|
||||
|
||||
# Récupération du timestamp Grist (ex: 1779124323.67)
|
||||
last_pass_timestamp = fields.get("Date_derniere_passe")
|
||||
|
||||
# Si le timestamp existe, on valide l'écart de temps
|
||||
if last_pass_timestamp is not None:
|
||||
try:
|
||||
# Calcul du temps écoulé depuis la dernière passe
|
||||
elapsed_time = current_timestamp - float(last_pass_timestamp)
|
||||
|
||||
# Si l'écart est inférieur à 15 minutes (900 secondes), on l'exclut
|
||||
if elapsed_time < MIN_INTERVAL_MINUTES * 60:
|
||||
log.debug(
|
||||
f"BAL {r['id']} ignorée : passée il y a seulement {int(elapsed_time // 60)} min."
|
||||
)
|
||||
continue
|
||||
except (ValueError, TypeError):
|
||||
log.warning(
|
||||
f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. "
|
||||
f"Incluse par sécurité."
|
||||
)
|
||||
|
||||
# Si pas de date (jamais passée) ou si assez ancienne, on l'ajoute
|
||||
filtered_bals.append({"id": r["id"], **fields})
|
||||
|
||||
return filtered_bals
|
||||
|
||||
except requests.RequestException as e:
|
||||
log.error(f"Erreur lecture Grist BALS : {e}")
|
||||
return []
|
||||
|
||||
|
||||
def grist_add_historique(
|
||||
email: str,
|
||||
status: str,
|
||||
nb_emails: int,
|
||||
duree_sec: int,
|
||||
log_text: str,
|
||||
dry: bool = False,
|
||||
):
|
||||
"""Ajoute un enregistrement dans la table HISTORIQUE."""
|
||||
final_status = ("dry:" + status) if dry else status
|
||||
payload = {
|
||||
"records": [
|
||||
{
|
||||
"fields": {
|
||||
"Bal": email,
|
||||
"Status": final_status,
|
||||
"Nb_emails": nb_emails,
|
||||
"Duree_sec": duree_sec,
|
||||
"Log": log_text[:4000],
|
||||
"Date": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
# print(f" → payload: {payload}")
|
||||
try:
|
||||
resp = requests.post(
|
||||
grist_url(TABLE_HISTORIQUE), headers=GRIST_HEADERS, json=payload, timeout=15
|
||||
)
|
||||
resp.raise_for_status()
|
||||
log.info(f"[{email}] Résultat écrit dans HISTORIQUE ({final_status})")
|
||||
except requests.RequestException as e:
|
||||
log.error(f"[{email}] Erreur écriture Grist HISTORIQUE : {e} \n {resp.text}")
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Partition nouvelles BAL / re-passes
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def partition_bals(rows: list[dict], available: int) -> list[dict]:
|
||||
"""
|
||||
Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes.
|
||||
|
||||
- NB_RESERVE slots sont strictement exclusifs aux re-passes.
|
||||
- Le reste des slots est réparti selon le NEW_RATIO.
|
||||
- Si une file est vide, ses slots standards basculent sur l'autre.
|
||||
- La réserve des repasses ne bascule JAMAIS vers les nouvelles.
|
||||
"""
|
||||
# On borne les slots disponibles par la capacité maximale de notre système
|
||||
slots_dispos = min(available, N_WORKERS)
|
||||
if slots_dispos <= 0:
|
||||
return []
|
||||
|
||||
# 1. Séparation des deux files
|
||||
nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)]
|
||||
repasses = [r for r in rows if r.get(COL_NB_SYNCS)]
|
||||
|
||||
# 2. Isolation de la réserve exclusive
|
||||
# Si on a moins de slots dispos que la réserve demandée, la réserve prend tout
|
||||
slots_reserve_exclusive = min(NB_RESERVE, slots_dispos)
|
||||
slots_partageables = slots_dispos - slots_reserve_exclusive
|
||||
|
||||
# 3. Répartition théorique des slots partageables selon le NEW_RATIO
|
||||
quota_new = math.ceil(slots_partageables * NEW_RATIO)
|
||||
quota_rep_standard = slots_partageables - quota_new
|
||||
|
||||
# Le quota total théorique des repasses inclut sa réserve exclusive
|
||||
quota_rep_total = quota_rep_standard + slots_reserve_exclusive
|
||||
|
||||
# 4. Premier round de sélection (dans la limite des éléments disponibles)
|
||||
picked_new = nouvelles[:quota_new]
|
||||
picked_rep = repasses[:quota_rep_total]
|
||||
|
||||
# 5. Gestion des surplus (Bascule asymétrique)
|
||||
surplus_new = quota_new - len(picked_new)
|
||||
# Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve
|
||||
surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep))
|
||||
|
||||
if surplus_new > 0:
|
||||
# Les nouvelles n'en veulent plus -> tout le rab va aux repasses
|
||||
picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new]
|
||||
|
||||
if surplus_rep_standard > 0:
|
||||
# Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles
|
||||
picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard]
|
||||
|
||||
# 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive
|
||||
assert len(picked_new) <= slots_partageables
|
||||
assert len(picked_new) + len(picked_rep) <= slots_dispos
|
||||
|
||||
# 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un)
|
||||
result = []
|
||||
i, j = 0, 0
|
||||
while i < len(picked_new) or j < len(picked_rep):
|
||||
if i < len(picked_new):
|
||||
result.append(picked_new[i])
|
||||
i += 1
|
||||
if j < len(picked_rep):
|
||||
result.append(picked_rep[j])
|
||||
j += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Parse de la sortie imapsync
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def parse_imapsync_output(output: str) -> dict:
|
||||
nb_emails = 0
|
||||
status = "success"
|
||||
error_msg = ""
|
||||
|
||||
m = re.search(r"Messages transferred :\s+(\d+)$", output, re.IGNORECASE)
|
||||
if m:
|
||||
nb_emails = int(m.group(1))
|
||||
|
||||
if re.search(r"Exiting with return value\s+0", output):
|
||||
status = "success"
|
||||
elif re.search(r"Exiting with return value\s+[^0]", output):
|
||||
status = "error"
|
||||
|
||||
err_match = re.search(
|
||||
r"^(Error|Err|FATAL):?\s+(.+)$", output, re.MULTILINE | re.IGNORECASE
|
||||
)
|
||||
if err_match:
|
||||
error_msg = err_match.group(2).strip()
|
||||
|
||||
return {"nb_emails": nb_emails, "status": status, "error_msg": error_msg}
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Commande imapsync
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
|
||||
def build_imapsync_cmd(email: str, dry: bool = False) -> list[str]:
|
||||
cmd = [
|
||||
IMAPSYNC,
|
||||
"--host1",
|
||||
HOST1,
|
||||
"--user1",
|
||||
email,
|
||||
"--authuser1",
|
||||
USER1_PREFIX,
|
||||
"--oauthaccesstoken1",
|
||||
OAUTH2_TOKEN_FILE,
|
||||
"--host2",
|
||||
HOST2,
|
||||
"--user2",
|
||||
email,
|
||||
"--authuser2",
|
||||
USER2_PREFIX,
|
||||
"--password2",
|
||||
PASS2,
|
||||
"--nofoldersizes",
|
||||
"--noreleasecheck",
|
||||
"--office1",
|
||||
"--automap",
|
||||
"--delete2duplicates",
|
||||
"--addheader",
|
||||
"--tmpdir",
|
||||
TMPDIR_CACHE,
|
||||
"--usecache",
|
||||
"--exclude",
|
||||
r"^Calendrier(/|$)",
|
||||
"--exclude",
|
||||
r"^Contacts(/|$)",
|
||||
"--exclude",
|
||||
r"^Boîte d'envoi(/|$)",
|
||||
"--exclude",
|
||||
r"^Flux RSS(/|$)",
|
||||
"--exclude",
|
||||
r"^Historique des conversations(/|$)",
|
||||
"--exclude",
|
||||
r"^Journal(/|$)",
|
||||
"--exclude",
|
||||
r"^Notes(/|$)",
|
||||
"--exclude",
|
||||
r"^Probl&AOg-mes de synchronisation(/|$)",
|
||||
"--exclude",
|
||||
r"^R&AOk-p&AOk-t&AOk-(/|$)",
|
||||
"--exclude",
|
||||
r"^T&AOI-ches(/|$)",
|
||||
"--exclude",
|
||||
r"^Contacts sugg&AOk-r&AOk-s(/|$)",
|
||||
"--f1f2",
|
||||
"Courrier indésirable=Pourriel",
|
||||
]
|
||||
for year in range(2000, 2027):
|
||||
cmd += ["--f1f2", f"Archive/{year}=Archives.{year}"]
|
||||
cmd += ["--f1f2", "Archive=Archives"]
|
||||
|
||||
if dry:
|
||||
cmd.append("--dry")
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Worker : exécute un job imapsync
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
_running: set[str] = set()
|
||||
|
||||
|
||||
def run_sync_job(row: dict, dry: bool = False) -> None:
|
||||
email = row.get(COL_EMAIL, "").strip()
|
||||
if not email:
|
||||
log.warning(f"Ligne {row.get('id')} sans courriel, ignorée.")
|
||||
return
|
||||
if email in _running:
|
||||
log.info(f"[{email}] Déjà en cours, skip.")
|
||||
return
|
||||
|
||||
_running.add(email)
|
||||
dry_tag = " [DRY]" if dry else ""
|
||||
log.info(f"[{email}] Démarrage{dry_tag}")
|
||||
|
||||
cmd = build_imapsync_cmd(email, dry=dry)
|
||||
start = time.monotonic()
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=IMAPSYNC_TIMEOUT
|
||||
)
|
||||
output = result.stdout + "\n" + result.stderr
|
||||
elapsed = int(time.monotonic() - start)
|
||||
parsed = parse_imapsync_output(output)
|
||||
|
||||
if result.returncode != 0 and parsed["status"] == "success":
|
||||
parsed["status"] = "error"
|
||||
parsed["error_msg"] = f"Exit code {result.returncode}"
|
||||
|
||||
log_text = output if parsed["status"] == "error" else result.stdout[-3000:]
|
||||
log.info(
|
||||
f"[{email}] Terminé{dry_tag} — statut={parsed['status']} "
|
||||
f"emails={parsed['nb_emails']} durée={elapsed}s"
|
||||
)
|
||||
grist_add_historique(
|
||||
email, parsed["status"], parsed["nb_emails"], elapsed, log_text, dry=dry
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
elapsed = int(time.monotonic() - start)
|
||||
log.error(f"[{email}] Timeout après {elapsed}s")
|
||||
grist_add_historique(
|
||||
email, "timeout", 0, elapsed, f"Timeout {elapsed}s dépassé", dry=dry
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
elapsed = int(time.monotonic() - start)
|
||||
log.error(f"[{email}] Exception inattendue : {e}")
|
||||
grist_add_historique(email, "error", 0, elapsed, str(e), dry=dry)
|
||||
|
||||
finally:
|
||||
_running.discard(email)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────
|
||||
# Boucle principale
|
||||
# ─────────────────────────────────────────────
|
||||
|
||||
_shutdown = False
|
||||
|
||||
|
||||
def handle_signal(signum, frame):
|
||||
global _shutdown
|
||||
log.info(f"Signal {signum} reçu — arrêt propre en cours…")
|
||||
_shutdown = True
|
||||
|
||||
|
||||
signal.signal(signal.SIGTERM, handle_signal)
|
||||
signal.signal(signal.SIGINT, handle_signal)
|
||||
|
||||
|
||||
def main():
|
||||
dry = ARGS.dry
|
||||
log.info("═" * 60)
|
||||
log.info(
|
||||
f"imapsync daemon démarré — workers={N_WORKERS} "
|
||||
f"poll={POLL_INTERVAL}s new_ratio={NEW_RATIO:.0%}"
|
||||
+ (" [MODE DRY]" if dry else "")
|
||||
)
|
||||
log.info(f"Source : {HOST1} → Destination : {HOST2}")
|
||||
log.info("═" * 60)
|
||||
|
||||
Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
|
||||
futures: dict = {}
|
||||
|
||||
while not _shutdown:
|
||||
# Nettoyage des futures terminées
|
||||
done = [f for f in list(futures) if f.done()]
|
||||
for f in done:
|
||||
email = futures.pop(f)
|
||||
try:
|
||||
f.result()
|
||||
except Exception as e:
|
||||
log.error(f"[{email}] Erreur non gérée : {e}")
|
||||
|
||||
# Renouvellement du token à chaque cycle de polling
|
||||
renew_oauth2_token()
|
||||
|
||||
# Remplissage des slots disponibles
|
||||
available = N_WORKERS - len(futures)
|
||||
if available > 0:
|
||||
rows = grist_fetch_bals()
|
||||
rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running]
|
||||
for row in partition_bals(rows, available):
|
||||
email = row.get(COL_EMAIL, "").strip()
|
||||
f = pool.submit(run_sync_job, row, dry)
|
||||
futures[f] = email
|
||||
log.info(f"[{email}] Job soumis au pool")
|
||||
|
||||
if not _shutdown:
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
log.info("Daemon arrêté proprement.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if ARGS.test:
|
||||
sys.exit(0 if run_tests() else 1)
|
||||
main()
|
||||
Reference in New Issue
Block a user