Files
office365_2_dimail/imapsync_daemon.py
T

809 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
imapsync_daemon.py
──────────────────
Daemon d'orchestration de migrations IMAP depuis Grist.
Fonctionnement :
1. Renouvellement du token OAuth2 (au démarrage et à chaque cycle)
2. Polling de la table BALS dans Grist (colonne Synchronisation = True)
3. Partition nouvelles BAL / re-passes selon NEW_RATIO
4. Lancement de N workers imapsync en parallèle
5. Parse de la sortie imapsync (nb emails, statut, erreurs)
6. Écriture des résultats dans la table HISTORIQUE de Grist
Modes de lancement :
(aucun argument) Mode daemon normal
--dry Ajoute --dry à imapsync (simulation, aucune écriture IMAP)
--test Vérifie toutes les variables d'environnement, la connectivité
Grist et la présence d'imapsync, puis quitte (0=OK, 1=erreur)
Variables d'environnement requises (fichier .env ou export shell) :
GRIST_API_KEY Clé API Grist
GRIST_DOC_ID ID du document Grist
GRIST_BASE_URL URL de base Grist (ex: https://grist.example.fr)
HOST1 Serveur IMAP source
HOST2 Serveur IMAP destination
USER1_PREFIX Compte admin source (authuser1 / identité OAuth2)
USER2_PREFIX Compte admin destination (authuser2)
OAUTH2_TOKEN_FILE Chemin vers le fichier de token OAuth2 source
PASS2 Mot de passe admin destination
IMAPSYNC Chemin vers l'exécutable imapsync (défaut: /usr/bin/imapsync)
OAUTH2IMAP Chemin vers l'exécutable oauth2imap (défaut: /usr/bin/oauth2imap)
TMPDIR_CACHE Répertoire cache imapsync (défaut: /tmp/imapsync_cache)
N_WORKERS Nombre de synchros parallèles (défaut: 4)
POLL_INTERVAL Intervalle de polling en secondes (défaut: 30)
NEW_RATIO Part des slots réservés aux nouvelles BAL (défaut: 0.20)
"""
import argparse
import logging
import math
import os
import re
import signal
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
# ─────────────────────────────────────────────
# Arguments CLI
# ─────────────────────────────────────────────
parser = argparse.ArgumentParser(
description="Daemon imapsync piloté par Grist",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--dry",
action="store_true",
help="Passe --dry à imapsync : simulation sans écriture sur le serveur destination",
)
parser.add_argument(
"--test",
action="store_true",
help="Vérifie la configuration et la connectivité, puis quitte (0=OK, 1=erreur)",
)
ARGS = parser.parse_args()
# ─────────────────────────────────────────────
# Configuration depuis l'environnement
# ─────────────────────────────────────────────
def require_env(name: str) -> str:
val = os.environ.get(name)
if not val:
logging.critical(f"Variable d'environnement manquante : {name}")
sys.exit(1)
return val
def opt_env(name: str, default: str) -> str:
return os.environ.get(name, default)
def load_dotenv(path: str = ".env"):
"""Chargement .env simple sans dépendance externe."""
p = Path(path)
if p.exists():
for line in p.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip('"').strip("'"))
load_dotenv()
GRIST_API_KEY = require_env("GRIST_API_KEY")
GRIST_DOC_ID = require_env("GRIST_DOC_ID")
GRIST_BASE_URL = opt_env("GRIST_BASE_URL", "http://localhost:8484")
HOST1 = require_env("HOST1")
HOST2 = require_env("HOST2")
USER1_PREFIX = require_env("USER1_PREFIX")
USER2_PREFIX = require_env("USER2_PREFIX")
OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE")
PASS2 = require_env("PASS2")
IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync")
IMAPSYNC_TIMEOUT = 72000
NB_RESERVE = 1
OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap")
TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache")
N_WORKERS = int(opt_env("N_WORKERS", "4"))
POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30"))
NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
# Interval minimum entre 2 vérification d'une meme BAL
MIN_INTERVAL_MINUTES = 15
# Noms des tables Grist (casse exacte telle qu'affichée dans Grist)
TABLE_BALS = "BALs"
TABLE_HISTORIQUE = "Historique"
# Colonnes Grist
COL_EMAIL = "Courriel"
COL_SYNC = "Synchronisation" # booléen déclencheur
COL_NB_SYNCS = "Nb_syncs" # compteur calculé par formule Grist
TOKEN_VALIDITY = 30 # Minutes
_LAST_RENEWAL_TIME = 0.0
# ─────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("imapsync_daemon.log", encoding="utf-8"),
],
)
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# Renouvellement du token OAuth2
# ─────────────────────────────────────────────
def renew_oauth2_token() -> bool:
"""
Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX
Retourne True si le renouvellement a réussi, False sinon.
Le daemon continue même en cas d'échec (le token existant peut encore
être valide), mais logue une erreur bien visible.
"""
global _LAST_RENEWAL_TIME
current_time = time.time()
# 1. Vérification du cache : est-ce que le token est encore valide ?
if current_time - _LAST_RENEWAL_TIME < TOKEN_VALIDITY * 60:
log.debug("Le token est encore valide. Pas de renouvellement nécessaire.")
return True
cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX]
log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…")
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
log.info("Token OAuth2 renouvelé avec succès.")
_LAST_RENEWAL_TIME = current_time
return True
else:
log.error(
f"Échec du renouvellement du token (code {result.returncode}) : "
f"{(result.stderr or result.stdout).strip()[:300]}"
)
return False
except subprocess.TimeoutExpired:
log.error("Timeout lors du renouvellement du token OAuth2 (>60s).")
return False
except FileNotFoundError:
log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}")
return False
except Exception as e:
log.error(f"Erreur inattendue lors du renouvellement du token : {e}")
return False
# ─────────────────────────────────────────────
# Mode --test
# ─────────────────────────────────────────────
def run_tests() -> bool:
"""
Vérifie tous les prérequis de la configuration.
Retourne True si tout est OK, False sinon.
"""
ok = True
sep = "" * 56
def check(label: str, passed: bool, detail: str = ""):
nonlocal ok
icon = "" if passed else ""
msg = f" {icon} {label}"
if detail:
msg += f"{detail}"
print(msg)
if not passed:
ok = False
print()
print("" * 56)
print(" imapsync_daemon — test de configuration")
print("" * 56)
# ── Variables d'environnement ────────────────
print(f"\n{sep}")
print(" Variables d'environnement")
print(sep)
env_checks = [
("GRIST_API_KEY", GRIST_API_KEY, None),
("GRIST_DOC_ID", GRIST_DOC_ID, None),
("GRIST_BASE_URL", GRIST_BASE_URL, GRIST_BASE_URL),
("HOST1", HOST1, HOST1),
("HOST2", HOST2, HOST2),
("USER1_PREFIX", USER1_PREFIX, USER1_PREFIX),
("USER2_PREFIX", USER2_PREFIX, USER2_PREFIX),
("OAUTH2_TOKEN_FILE", OAUTH2_TOKEN_FILE, OAUTH2_TOKEN_FILE),
("PASS2", PASS2, "défini"),
("IMAPSYNC", IMAPSYNC, IMAPSYNC),
("OAUTH2IMAP", OAUTH2IMAP, OAUTH2IMAP),
("TMPDIR_CACHE", TMPDIR_CACHE, TMPDIR_CACHE),
("N_WORKERS", str(N_WORKERS), str(N_WORKERS)),
("POLL_INTERVAL", str(POLL_INTERVAL), f"{POLL_INTERVAL}s"),
("NEW_RATIO", str(NEW_RATIO), f"{NEW_RATIO:.0%}"),
]
for name, val, shown in env_checks:
set_ok = bool(val)
check(name, set_ok, shown if set_ok else "MANQUANTE")
# ── Fichiers et répertoires ──────────────────
print(f"\n{sep}")
print(" Fichiers et répertoires")
print(sep)
for exe_name, exe_path in [("imapsync", IMAPSYNC), ("oauth2imap", OAUTH2IMAP)]:
p = Path(exe_path)
found = p.exists() and os.access(p, os.X_OK)
check(
f"{exe_name} exécutable",
found,
str(p) + (" (trouvé)" if found else " (introuvable)"),
)
token_p = Path(OAUTH2_TOKEN_FILE)
check(
"OAUTH2_TOKEN_FILE",
token_p.exists() and token_p.is_file(),
str(token_p) + (" (trouvé)" if token_p.exists() else " (introuvable)"),
)
tmpdir_p = Path(TMPDIR_CACHE)
if tmpdir_p.exists():
check("TMPDIR_CACHE", True, f"{tmpdir_p} (existe)")
else:
try:
tmpdir_p.mkdir(parents=True, exist_ok=True)
check("TMPDIR_CACHE", True, f"{tmpdir_p} (créé)")
except OSError as e:
check("TMPDIR_CACHE", False, str(e))
# ── Renouvellement token ─────────────────────
print(f"\n{sep}")
print(" Renouvellement token OAuth2")
print(sep)
token_ok = renew_oauth2_token()
check(
"oauth2imap --token_file … renew",
token_ok,
"succès" if token_ok else "échec — vérifier oauth2imap et le token",
)
# ── Connectivité Grist ───────────────────────
print(f"\n{sep}")
print(" Connectivité Grist")
print(sep)
grist_headers = {
"Authorization": f"Bearer {GRIST_API_KEY}",
"Content-Type": "application/json",
}
try:
resp = requests.get(
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}",
headers=grist_headers,
timeout=10,
)
resp.raise_for_status()
doc_name = resp.json().get("name", "?")
check("Accès document Grist", True, f'"{doc_name}"')
except requests.RequestException as e:
check("Accès document Grist", False, str(e))
for table in (TABLE_BALS, TABLE_HISTORIQUE):
try:
resp = requests.get(
f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records",
headers=grist_headers,
timeout=10,
)
resp.raise_for_status()
nb = len(resp.json().get("records", []))
if table == TABLE_BALS and nb > 0:
sample = resp.json()["records"][0]["fields"]
has_nb_syncs = COL_NB_SYNCS in sample
detail = (
f"{nb} enregistrements"
if has_nb_syncs
else f"{nb} enregistrements ⚠ colonne '{COL_NB_SYNCS}' absente — à créer dans Grist"
)
check(f"Lecture table {table}", True, detail)
else:
check(f"Lecture table {table}", True, f"{nb} enregistrements")
except requests.RequestException as e:
check(f"Lecture table {table}", False, str(e))
grist_add_historique("mathieu.maura@sdis66.fr", "Test", 0, 10, "Test")
# ── Paramètres calculés ──────────────────────
print(f"\n{sep}")
print(" Paramètres calculés")
print(sep)
slots_new = math.ceil(N_WORKERS * NEW_RATIO)
slots_rep = N_WORKERS - slots_new
print(f" • N_WORKERS={N_WORKERS} NEW_RATIO={NEW_RATIO:.0%}")
print(f"{slots_new} slot(s) nouvelles BAL")
print(f"{slots_rep} slot(s) re-passes")
if ARGS.dry:
print(" • Mode --dry actif : imapsync sera lancé avec --dry")
# ── Résumé ───────────────────────────────────
print()
print("" * 56)
if ok:
print(" Résultat : OK — configuration valide")
else:
print(" Résultat : ERREUR — corriger les points marqués ✗")
print("" * 56)
print()
return ok
# ─────────────────────────────────────────────
# Grist API
# ─────────────────────────────────────────────
GRIST_HEADERS = {
"Authorization": f"Bearer {GRIST_API_KEY}",
"Content-Type": "application/json",
}
def grist_url(table: str) -> str:
return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records"
def grist_fetch_bals() -> list[dict]:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
try:
# 1. Récupération triée via l'API Grist
params = {"sort": "Date_derniere_passe"}
resp = requests.get(
grist_url(TABLE_BALS), headers=GRIST_HEADERS, params=params, timeout=15
)
resp.raise_for_status()
records = resp.json().get("records", [])
# 2. On récupère le timestamp actuel en secondes
current_timestamp = time.time()
filtered_bals = []
for r in records:
fields = r["fields"]
# Vérification de la condition de synchronisation globale
if fields.get(COL_SYNC) is not True:
continue
# Récupération du timestamp Grist (ex: 1779124323.67)
last_pass_timestamp = fields.get("Date_derniere_passe")
# Si le timestamp existe, on valide l'écart de temps
if last_pass_timestamp is not None:
try:
# Calcul du temps écoulé depuis la dernière passe
elapsed_time = current_timestamp - float(last_pass_timestamp)
# Si l'écart est inférieur à 15 minutes (900 secondes), on l'exclut
if elapsed_time < MIN_INTERVAL_MINUTES * 60:
log.debug(
f"BAL {r['id']} ignorée : passée il y a seulement {int(elapsed_time // 60)} min."
)
continue
except (ValueError, TypeError):
log.warning(
f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. "
f"Incluse par sécurité."
)
# Si pas de date (jamais passée) ou si assez ancienne, on l'ajoute
filtered_bals.append({"id": r["id"], **fields})
return filtered_bals
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return []
def grist_priority_bals() -> dict:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
try:
# 1. Récupération triée via l'API Grist
params = {"limit": 1, "filter": {"priority": [True], COL_SYNC: [True]}}
resp = requests.get(
grist_url(TABLE_BALS), headers=GRIST_HEADERS, params=params, timeout=15
)
resp.raise_for_status()
records = resp.json().get("records", [])
# for r in records:
r = records[0]
fields = r["fields"]
# Vérification de la condition de synchronisation globale
if fields.get(COL_SYNC) is not True:
return {}
# Si le timestamp existe, on valide l'écart de temps
if fields.get("Date_derniere_passe") is not None:
return {}
return r
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return {}
def grist_add_historique(
email: str,
status: str,
nb_emails: int,
duree_sec: int,
log_text: str,
dry: bool = False,
):
"""Ajoute un enregistrement dans la table HISTORIQUE."""
final_status = ("dry:" + status) if dry else status
payload = {
"records": [
{
"fields": {
"Bal": email,
"Status": final_status,
"Nb_emails": nb_emails,
"Duree_sec": duree_sec,
"Log": log_text[:4000],
"Date": datetime.now(timezone.utc).isoformat(),
}
}
]
}
# print(f" → payload: {payload}")
try:
resp = requests.post(
grist_url(TABLE_HISTORIQUE), headers=GRIST_HEADERS, json=payload, timeout=15
)
resp.raise_for_status()
log.info(f"[{email}] Résultat écrit dans HISTORIQUE ({final_status})")
except requests.RequestException as e:
log.error(f"[{email}] Erreur écriture Grist HISTORIQUE : {e} \n {resp.text}")
# ─────────────────────────────────────────────
# Partition nouvelles BAL / re-passes
# ─────────────────────────────────────────────
def partition_bals(rows: list[dict], available: int) -> list[dict]:
"""
Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes.
- NB_RESERVE slots sont strictement exclusifs aux re-passes.
- Le reste des slots est réparti selon le NEW_RATIO.
- Si une file est vide, ses slots standards basculent sur l'autre.
- La réserve des repasses ne bascule JAMAIS vers les nouvelles.
"""
# On borne les slots disponibles par la capacité maximale de notre système
slots_dispos = min(available, N_WORKERS)
if slots_dispos <= 0:
return []
# 1. Séparation des deux files
nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)]
repasses = [r for r in rows if r.get(COL_NB_SYNCS)]
# 2. Isolation de la réserve exclusive
# Si on a moins de slots dispos que la réserve demandée, la réserve prend tout
slots_reserve_exclusive = min(NB_RESERVE, slots_dispos)
slots_partageables = slots_dispos - slots_reserve_exclusive
# 3. Répartition théorique des slots partageables selon le NEW_RATIO
quota_new = math.ceil(slots_partageables * NEW_RATIO)
quota_rep_standard = slots_partageables - quota_new
# Le quota total théorique des repasses inclut sa réserve exclusive
quota_rep_total = quota_rep_standard + slots_reserve_exclusive
# 4. Premier round de sélection (dans la limite des éléments disponibles)
picked_new = nouvelles[:quota_new]
picked_rep = repasses[:quota_rep_total]
# 5. Gestion des surplus (Bascule asymétrique)
surplus_new = quota_new - len(picked_new)
# Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve
surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep))
if surplus_new > 0:
# Les nouvelles n'en veulent plus -> tout le rab va aux repasses
picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new]
if surplus_rep_standard > 0:
# Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles
picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard]
# 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive
assert len(picked_new) <= slots_partageables
assert len(picked_new) + len(picked_rep) <= slots_dispos
# 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un)
result = []
i, j = 0, 0
while i < len(picked_new) or j < len(picked_rep):
if i < len(picked_new):
result.append(picked_new[i])
i += 1
if j < len(picked_rep):
result.append(picked_rep[j])
j += 1
return result
# ─────────────────────────────────────────────
# Parse de la sortie imapsync
# ─────────────────────────────────────────────
def parse_imapsync_output(output: str) -> dict:
nb_emails = 0
status = "success"
error_msg = ""
m = re.search(r"Messages transferred :\s+(\d+)$", output, re.IGNORECASE)
if m:
nb_emails = int(m.group(1))
if re.search(r"Exiting with return value\s+0", output):
status = "success"
elif re.search(r"Exiting with return value\s+[^0]", output):
status = "error"
err_match = re.search(
r"^(Error|Err|FATAL):?\s+(.+)$", output, re.MULTILINE | re.IGNORECASE
)
if err_match:
error_msg = err_match.group(2).strip()
return {"nb_emails": nb_emails, "status": status, "error_msg": error_msg}
# ─────────────────────────────────────────────
# Commande imapsync
# ─────────────────────────────────────────────
def build_imapsync_cmd(email: str, dry: bool = False) -> list[str]:
cmd = [
IMAPSYNC,
"--host1",
HOST1,
"--user1",
email,
"--authuser1",
USER1_PREFIX,
"--oauthaccesstoken1",
OAUTH2_TOKEN_FILE,
"--host2",
HOST2,
"--user2",
email,
"--authuser2",
USER2_PREFIX,
"--password2",
PASS2,
"--nofoldersizes",
"--noreleasecheck",
"--office1",
"--automap",
"--delete2duplicates",
"--addheader",
"--tmpdir",
TMPDIR_CACHE,
"--usecache",
"--exclude",
r"^Calendrier(/|$)",
"--exclude",
r"^Contacts(/|$)",
"--exclude",
r"^Boîte d'envoi(/|$)",
"--exclude",
r"^Flux RSS(/|$)",
"--exclude",
r"^Historique des conversations(/|$)",
"--exclude",
r"^Journal(/|$)",
"--exclude",
r"^Notes(/|$)",
"--exclude",
r"^Probl&AOg-mes de synchronisation(/|$)",
"--exclude",
r"^R&AOk-p&AOk-t&AOk-(/|$)",
"--exclude",
r"^T&AOI-ches(/|$)",
"--exclude",
r"^Contacts sugg&AOk-r&AOk-s(/|$)",
"--f1f2",
"Courrier indésirable=Pourriel",
]
for year in range(2000, 2027):
cmd += ["--f1f2", f"Archive/{year}=Archives.{year}"]
cmd += ["--f1f2", "Archive=Archives"]
if dry:
cmd.append("--dry")
return cmd
# ─────────────────────────────────────────────
# Worker : exécute un job imapsync
# ─────────────────────────────────────────────
_running: set[str] = set()
def run_sync_job(row: dict, dry: bool = False) -> None:
email = row.get(COL_EMAIL, "").strip()
if not email:
log.warning(f"Ligne {row.get('id')} sans courriel, ignorée.")
return
if email in _running:
log.info(f"[{email}] Déjà en cours, skip.")
return
_running.add(email)
dry_tag = " [DRY]" if dry else ""
log.info(f"[{email}] Démarrage{dry_tag}")
cmd = build_imapsync_cmd(email, dry=dry)
start = time.monotonic()
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=IMAPSYNC_TIMEOUT
)
output = result.stdout + "\n" + result.stderr
elapsed = int(time.monotonic() - start)
parsed = parse_imapsync_output(output)
if result.returncode != 0 and parsed["status"] == "success":
parsed["status"] = "error"
parsed["error_msg"] = f"Exit code {result.returncode}"
log_text = output if parsed["status"] == "error" else result.stdout[-3000:]
log.info(
f"[{email}] Terminé{dry_tag} — statut={parsed['status']} "
f"emails={parsed['nb_emails']} durée={elapsed}s"
)
grist_add_historique(
email, parsed["status"], parsed["nb_emails"], elapsed, log_text, dry=dry
)
except subprocess.TimeoutExpired:
elapsed = int(time.monotonic() - start)
log.error(f"[{email}] Timeout après {elapsed}s")
grist_add_historique(
email, "timeout", 0, elapsed, f"Timeout {elapsed}s dépassé", dry=dry
)
except Exception as e:
elapsed = int(time.monotonic() - start)
log.error(f"[{email}] Exception inattendue : {e}")
grist_add_historique(email, "error", 0, elapsed, str(e), dry=dry)
finally:
_running.discard(email)
# ─────────────────────────────────────────────
# Boucle principale
# ─────────────────────────────────────────────
_shutdown = False
def handle_signal(signum, frame):
global _shutdown
log.info(f"Signal {signum} reçu — arrêt propre en cours…")
_shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def main():
dry = ARGS.dry
log.info("" * 60)
log.info(
f"imapsync daemon démarré — workers={N_WORKERS} "
f"poll={POLL_INTERVAL}s new_ratio={NEW_RATIO:.0%}"
+ (" [MODE DRY]" if dry else "")
)
log.info(f"Source : {HOST1} → Destination : {HOST2}")
log.info("" * 60)
Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True)
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool:
futures: dict = {}
while not _shutdown:
# Nettoyage des futures terminées
done = [f for f in list(futures) if f.done()]
for f in done:
email = futures.pop(f)
try:
f.result()
except Exception as e:
log.error(f"[{email}] Erreur non gérée : {e}")
# Renouvellement du token à chaque cycle de polling
renew_oauth2_token()
# Remplissage des slots disponibles
available = N_WORKERS - len(futures)
if available > 0:
rows = grist_fetch_bals()
rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running]
for row in partition_bals(rows, available):
email = row.get(COL_EMAIL, "").strip()
f = pool.submit(run_sync_job, row, dry)
futures[f] = email
log.info(f"[{email}] Job soumis au pool")
# Ajout de la BAL prioritaire
row = grist_priority_bals()
email = row.get(COL_EMAIL, "").strip()
if email not in _running and email != "":
f = pool.submit(run_sync_job, row, dry)
futures[f] = email
log.info(f"[{email}] Job soumis au pool")
if not _shutdown:
time.sleep(POLL_INTERVAL)
log.info("Daemon arrêté proprement.")
if __name__ == "__main__":
if ARGS.test:
sys.exit(0 if run_tests() else 1)
main()