Files
office365_2_dimail/imapsync_daemon.py
T
2026-06-16 01:59:08 +02:00

797 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
imapsync_daemon.py
──────────────────
Daemon d'orchestration de migrations IMAP depuis Grist.
Fonctionnement :
1. Renouvellement du token OAuth2 (au démarrage et à chaque cycle)
2. Polling de la table BALS dans Grist (colonne Synchronisation = True)
3. Partition nouvelles BAL / re-passes selon NEW_RATIO
4. Lancement de N workers imapsync en parallèle
5. Parse de la sortie imapsync (nb emails, statut, erreurs)
6. Écriture des résultats dans la table HISTORIQUE de Grist
Modes de lancement :
(aucun argument) Mode daemon normal
--dry Ajoute --dry à imapsync (simulation, aucune écriture IMAP)
--test Vérifie toutes les variables d'environnement, la connectivité
Grist et la présence d'imapsync, puis quitte (0=OK, 1=erreur)
Variables d'environnement requises (fichier .env ou export shell) :
GRIST_API_KEY Clé API Grist
GRIST_DOC_ID ID du document Grist
GRIST_BASE_URL URL de base Grist (ex: https://grist.example.fr)
HOST1 Serveur IMAP source
HOST2 Serveur IMAP destination
USER1_PREFIX Compte admin source (authuser1 / identité OAuth2)
USER2_PREFIX Compte admin destination (authuser2)
OAUTH2_TOKEN_FILE Chemin vers le fichier de token OAuth2 source
PASS2 Mot de passe admin destination
IMAPSYNC Chemin vers l'exécutable imapsync (défaut: /usr/bin/imapsync)
OAUTH2IMAP Chemin vers l'exécutable oauth2imap (défaut: /usr/bin/oauth2imap)
TMPDIR_CACHE Répertoire cache imapsync (défaut: /tmp/imapsync_cache)
N_WORKERS Nombre de synchros parallèles (défaut: 4)
POLL_INTERVAL Intervalle de polling en secondes (défaut: 30)
NEW_RATIO Part des slots réservés aux nouvelles BAL (défaut: 0.20)
"""
import argparse
import http
import http.client
import json
import logging
import os
import re
import signal
import subprocess
import sys
import time
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from datetime import datetime, timezone
from pathlib import Path
import requests
# ─────────────────────────────────────────────
# Arguments CLI
# ─────────────────────────────────────────────
parser = argparse.ArgumentParser(
description="Daemon imapsync piloté par Grist",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--dry",
action="store_true",
help="Passe --dry à imapsync : simulation sans écriture sur le serveur destination",
)
parser.add_argument(
"--test",
action="store_true",
help="Vérifie la configuration et la connectivité, puis quitte (0=OK, 1=erreur)",
)
ARGS = parser.parse_args()
# ─────────────────────────────────────────────
# Configuration depuis l'environnement
# ─────────────────────────────────────────────
def require_env(name: str) -> str:
val = os.environ.get(name)
if not val:
logging.critical(f"Variable d'environnement manquante : {name}")
sys.exit(1)
return val
def opt_env(name: str, default: str) -> str:
return os.environ.get(name, default)
def load_dotenv(path: str = ".env"):
"""Chargement .env simple sans dépendance externe."""
p = Path(path)
if p.exists():
for line in p.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
os.environ.setdefault(key.strip(), val.strip().strip('"').strip("'"))
load_dotenv()
GRIST_API_KEY = require_env("GRIST_API_KEY")
GRIST_DOC_ID = require_env("GRIST_DOC_ID")
GRIST_BASE_URL = require_env("GRIST_BASE_URL")
HOST1 = require_env("HOST1")
HOST2 = require_env("HOST2")
USER1_PREFIX = require_env("USER1_PREFIX")
USER2_PREFIX = require_env("USER2_PREFIX")
OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE")
PASS2 = require_env("PASS2")
IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync")
IMAPSYNC_TIMEOUT = 72000
# NB_RESERVE = 1
OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap")
TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache")
POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30"))
NB_CONCURENT_NEW_BAL = int(opt_env("NB_CONCURENT_NEW_BAL", "3"))
NB_CONCURENT_REPASS_BAL = int(opt_env("NB_CONCURENT_REPASS_BAL", "3"))
NB_CONCURENT_PRIORITY_BAL = int(opt_env("NB_CONCURENT_PRIORITY_BAL", "3"))
NB_MAX_WORKERS = (
NB_CONCURENT_NEW_BAL + NB_CONCURENT_REPASS_BAL + NB_CONCURENT_PRIORITY_BAL
)
# NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
# Interval en minute minimum entre 2 vérification d'une meme BAL
MIN_INTERVAL_MINUTES = 15
TABLE_BALS = require_env("TABLE_BALS")
COL_EMAIL = require_env("COL_EMAIL")
COL_SYNC = require_env("COL_SYNC")
COL_NB_SYNCS = require_env("COL_NB_SYNCS")
COL_PRIORITY = require_env("COL_PRIORITY")
COL_SUCCESS = require_env("COL_SUCCESS")
COL_LAST_PASS = require_env("COL_LAST_PASS")
TABLE_HISTORIQUE = require_env("TABLE_HISTORIQUE")
COL_BAL = require_env("COL_BAL")
COL_STATUS = require_env("COL_STATUS")
COL_NB_EMAILS = require_env("COL_NB_EMAILS")
COL_DUREE_SEC = require_env("COL_DUREE_SEC")
COL_LOG = require_env("COL_LOG")
COL_DATE = require_env("COL_DATE")
TOKEN_VALIDITY = 30 # Minutes
_LAST_RENEWAL_TIME = 0.0
# ─────────────────────────────────────────────
# Logging
# ─────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("imapsync_daemon.log", encoding="utf-8"),
],
)
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────
# Renouvellement du token OAuth2
# ─────────────────────────────────────────────
def renew_oauth2_token(dry: bool) -> bool:
"""
Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX
Retourne True si le renouvellement a réussi, False sinon.
Le daemon continue même en cas d'échec (le token existant peut encore
être valide), mais logue une erreur bien visible.
"""
global _LAST_RENEWAL_TIME
current_time = time.time()
# 1. Vérification du cache : est-ce que le token est encore valide ?
if current_time - _LAST_RENEWAL_TIME < TOKEN_VALIDITY * 60:
log.debug("Le token est encore valide. Pas de renouvellement nécessaire.")
return True
cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX]
log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…")
if not dry:
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
log.info("Token OAuth2 renouvelé avec succès.")
_LAST_RENEWAL_TIME = current_time
return True
else:
log.error(
f"Échec du renouvellement du token (code {result.returncode}) : "
f"{(result.stderr or result.stdout).strip()[:300]}"
)
return False
except subprocess.TimeoutExpired:
log.error("Timeout lors du renouvellement du token OAuth2 (>60s).")
return False
except FileNotFoundError:
log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}")
return False
except Exception as e:
log.error(f"Erreur inattendue lors du renouvellement du token : {e}")
return False
else:
return True
# ─────────────────────────────────────────────
# Mode --test
# ─────────────────────────────────────────────
def run_tests() -> bool:
http.client.HTTPConnection.debuglevel = 1
print(f" → payload: {grist_fetch_new_bals()}")
return True
# ─────────────────────────────────────────────
# Grist API
# ─────────────────────────────────────────────
GRIST_HEADERS = {
"Authorization": f"Bearer {GRIST_API_KEY}",
"Content-Type": "application/json",
}
def grist_records_url(table: str) -> str:
return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records"
def grist_fetch_new_bals() -> list[dict]:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
try:
# 1. Récupération triée via l'API Grist
params = {
# "limit": NB_CONCURENT_NEW_BAL,
"filter": json.dumps(
{COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [False]}
),
"sort": COL_LAST_PASS,
}
resp = requests.get(
grist_records_url(TABLE_BALS),
headers=GRIST_HEADERS,
params=params,
timeout=15,
)
resp.raise_for_status()
records = resp.json().get("records", [])
filtered_bals = []
for r in records:
fields = r["fields"]
# Récupération du timestamp Grist (ex: 1779124323.67)
last_pass_timestamp = fields.get("Date_derniere_passe")
# Si le timestamp existe, on valide l'écart de temps
if last_pass_timestamp is not None:
continue
# Si pas de date (jamais passée)
filtered_bals.append({"id": r["id"], **fields})
return filtered_bals
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return []
def grist_fetch_repass_bals() -> list[dict]:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
try:
# 1. Récupération triée via l'API Grist
params = {
# "limit": NB_CONCURENT_REPASS_BAL,
"filter": json.dumps(
{COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [True]}
),
"sort": COL_LAST_PASS,
}
resp = requests.get(
grist_records_url(TABLE_BALS),
headers=GRIST_HEADERS,
params=params,
timeout=15,
)
resp.raise_for_status()
records = resp.json().get("records", [])
# 2. On récupère le timestamp actuel en secondes
current_timestamp = time.time()
filtered_bals = []
for r in records:
fields = r["fields"]
# Récupération du timestamp Grist (ex: 1779124323.67)
last_pass_timestamp = fields.get("Date_derniere_passe")
# Si le timestamp existe, on valide l'écart de temps
if last_pass_timestamp is not None:
continue
try:
# Calcul du temps écoulé depuis la dernière passe
elapsed_time = current_timestamp - float(last_pass_timestamp)
# Si l'écart est inférieur à 15 minutes (900 secondes), on l'exclut
if elapsed_time < MIN_INTERVAL_MINUTES * 60:
log.debug(
f"BAL {r['id']} ignorée : passée il y a seulement {int(elapsed_time // 60)} min."
)
continue
except (ValueError, TypeError):
log.warning(
f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. "
f"Incluse par sécurité."
)
else:
continue
# Si assez ancienne, on l'ajoute
filtered_bals.append({"id": r["id"], **fields})
return filtered_bals
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return []
def grist_fetch_priority_bals() -> dict:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
log.info("grist_priority_bals")
try:
# 1. Récupération triée via l'API Grist
params = {
# "limit": NB_CONCURENT_PRIORITY_BAL,
"filter": json.dumps({COL_PRIORITY: [True], COL_SYNC: [True]}),
"sort": COL_LAST_PASS,
}
resp = requests.get(
grist_records_url(TABLE_BALS),
headers=GRIST_HEADERS,
params=params,
timeout=15,
)
resp.raise_for_status()
records = resp.json().get("records", [])
r = records[0]
fields = r["fields"]
# Vérification de la condition de synchronisation globale
if fields.get(COL_SYNC) is not True:
return {}
# Au moins une passe déjà faite
if fields.get("Date_derniere_passe") is None:
return {}
return r
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return {}
def grist_add_historique(
email: str,
status: str,
nb_emails: int,
duree_sec: int,
log_text: str,
dry: bool = False,
):
"""Ajoute un enregistrement dans la table HISTORIQUE."""
final_status = ("dry:" + status) if dry else status
payload = {
"records": [
{
"fields": {
COL_BAL: email,
COL_STATUS: final_status,
COL_NB_EMAILS: nb_emails,
COL_DUREE_SEC: duree_sec,
COL_LOG: log_text[:4000],
COL_DATE: datetime.now(timezone.utc).isoformat(),
}
}
]
}
# print(f" → payload: {payload}")
try:
resp = requests.post(
grist_records_url(TABLE_HISTORIQUE),
headers=GRIST_HEADERS,
json=payload,
timeout=15,
)
resp.raise_for_status()
log.info(f"[{email}] Résultat écrit dans HISTORIQUE ({final_status})")
except requests.RequestException as e:
log.error(f"[{email}] Erreur écriture Grist HISTORIQUE : {e} \n {resp.text}")
# ─────────────────────────────────────────────
# Partition nouvelles BAL / re-passes
# ─────────────────────────────────────────────
# def partition_bals(rows: list[dict], available: int) -> list[dict]:
# """
# Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes.
# - NB_RESERVE slots sont strictement exclusifs aux re-passes.
# - Le reste des slots est réparti selon le NEW_RATIO.
# - Si une file est vide, ses slots standards basculent sur l'autre.
# - La réserve des repasses ne bascule JAMAIS vers les nouvelles.
# """
# # On borne les slots disponibles par la capacité maximale de notre système
# slots_dispos = min(available, N_WORKERS)
# if slots_dispos <= 0:
# return []
# # 1. Séparation des deux files
# nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)]
# repasses = [r for r in rows if r.get(COL_NB_SYNCS)]
# # 2. Isolation de la réserve exclusive
# # Si on a moins de slots dispos que la réserve demandée, la réserve prend tout
# slots_reserve_exclusive = min(NB_RESERVE, slots_dispos)
# slots_partageables = slots_dispos - slots_reserve_exclusive
# # 3. Répartition théorique des slots partageables selon le NEW_RATIO
# quota_new = math.ceil(slots_partageables * NEW_RATIO)
# quota_rep_standard = slots_partageables - quota_new
# # Le quota total théorique des repasses inclut sa réserve exclusive
# quota_rep_total = quota_rep_standard + slots_reserve_exclusive
# # 4. Premier round de sélection (dans la limite des éléments disponibles)
# picked_new = nouvelles[:quota_new]
# picked_rep = repasses[:quota_rep_total]
# # 5. Gestion des surplus (Bascule asymétrique)
# surplus_new = quota_new - len(picked_new)
# # Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve
# surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep))
# if surplus_new > 0:
# # Les nouvelles n'en veulent plus -> tout le rab va aux repasses
# picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new]
# if surplus_rep_standard > 0:
# # Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles
# picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard]
# # 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive
# assert len(picked_new) <= slots_partageables
# assert len(picked_new) + len(picked_rep) <= slots_dispos
# # 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un)
# result = []
# i, j = 0, 0
# while i < len(picked_new) or j < len(picked_rep):
# if i < len(picked_new):
# result.append(picked_new[i])
# i += 1
# if j < len(picked_rep):
# result.append(picked_rep[j])
# j += 1
# return result
# ─────────────────────────────────────────────
# Parse de la sortie imapsync
# ─────────────────────────────────────────────
def parse_imapsync_output(output: str) -> dict:
nb_emails = 0
status = "success"
error_msg = ""
m = re.search(r"Messages transferred :\s+(\d+)$", output, re.IGNORECASE)
if m:
nb_emails = int(m.group(1))
if re.search(r"Exiting with return value\s+0", output):
status = "success"
elif re.search(r"Exiting with return value\s+[^0]", output):
status = "error"
err_match = re.search(
r"^(Error|Err|FATAL):?\s+(.+)$", output, re.MULTILINE | re.IGNORECASE
)
if err_match:
error_msg = err_match.group(2).strip()
return {"nb_emails": nb_emails, "status": status, "error_msg": error_msg}
# ─────────────────────────────────────────────
# Commande imapsync
# ─────────────────────────────────────────────
def build_imapsync_cmd(email: str, dry: bool = False) -> list[str]:
cmd = [
IMAPSYNC,
"--host1",
HOST1,
"--user1",
email,
"--authuser1",
USER1_PREFIX,
"--oauthaccesstoken1",
OAUTH2_TOKEN_FILE,
"--host2",
HOST2,
"--user2",
email,
"--authuser2",
USER2_PREFIX,
"--password2",
PASS2,
"--nofoldersizes",
"--noreleasecheck",
"--office1",
"--automap",
"--delete2duplicates",
"--addheader",
"--tmpdir",
TMPDIR_CACHE,
"--usecache",
"--exclude",
r"^Calendrier(/|$)",
"--exclude",
r"^Contacts(/|$)",
"--exclude",
r"^Boîte d'envoi(/|$)",
"--exclude",
r"^Flux RSS(/|$)",
"--exclude",
r"^Historique des conversations(/|$)",
"--exclude",
r"^Journal(/|$)",
"--exclude",
r"^Notes(/|$)",
"--exclude",
r"^Probl&AOg-mes de synchronisation(/|$)",
"--exclude",
r"^R&AOk-p&AOk-t&AOk-(/|$)",
"--exclude",
r"^T&AOI-ches(/|$)",
"--exclude",
r"^Contacts sugg&AOk-r&AOk-s(/|$)",
"--f1f2",
"Courrier indésirable=Pourriel",
]
for year in range(2000, 2027):
cmd += ["--f1f2", f"Archive/{year}=Archives.{year}"]
cmd += ["--f1f2", "Archive=Archives"]
if dry:
cmd.append("--dry")
return cmd
# ─────────────────────────────────────────────
# Worker : exécute un job imapsync
# ─────────────────────────────────────────────
_running: set[str] = set()
def run_sync_job(row: dict, dry: bool = False) -> None:
email = row.get(COL_EMAIL, "").strip()
if not email:
log.warning(f"Ligne {row.get('id')} sans courriel, ignorée.")
return
if email in _running:
log.info(f"[{email}] Déjà en cours, skip.")
return
_running.add(email)
dry_tag = " [DRY]" if dry else ""
log.info(f"[{email}] Démarrage{dry_tag}")
cmd = build_imapsync_cmd(email, dry=dry)
start = time.monotonic()
try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=IMAPSYNC_TIMEOUT
)
output = result.stdout + "\n" + result.stderr
elapsed = int(time.monotonic() - start)
parsed = parse_imapsync_output(output)
if result.returncode != 0 and parsed["status"] == "success":
parsed["status"] = "error"
parsed["error_msg"] = f"Exit code {result.returncode}"
log_text = output if parsed["status"] == "error" else result.stdout[-3000:]
log.info(
f"[{email}] Terminé{dry_tag} — statut={parsed['status']} "
f"emails={parsed['nb_emails']} durée={elapsed}s"
)
grist_add_historique(
email, parsed["status"], parsed["nb_emails"], elapsed, log_text, dry=dry
)
except subprocess.TimeoutExpired:
elapsed = int(time.monotonic() - start)
log.error(f"[{email}] Timeout après {elapsed}s")
grist_add_historique(
email, "timeout", 0, elapsed, f"Timeout {elapsed}s dépassé", dry=dry
)
except Exception as e:
elapsed = int(time.monotonic() - start)
log.error(f"[{email}] Exception inattendue : {e}")
grist_add_historique(email, "error", 0, elapsed, str(e), dry=dry)
finally:
_running.discard(email)
# ─────────────────────────────────────────────
# Boucle principale
# ─────────────────────────────────────────────
_shutdown = False
def handle_signal(signum, frame):
global _shutdown
log.info(f"Signal {signum} reçu — arrêt propre en cours…")
_shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
def main():
dry = ARGS.dry
log.info("" * 60)
log.info(
f"imapsync daemon démarré — workers={NB_MAX_WORKERS} "
f"poll={POLL_INTERVAL}s " + (" [MODE DRY]" if dry else "")
)
log.info(f"Source : {HOST1} → Destination : {HOST2}")
log.info("" * 60)
if not dry:
Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True)
with ThreadPoolExecutor(max_workers=NB_MAX_WORKERS) as pool:
# Dictionnaires pour suivre les futures par catégorie
futures_new_bals: dict = {} # Futures pour les nouvelles BALs
futures_repass_bals: dict = {} # Futures pour les repasses
futures_priority_bals: dict = {} # Futures pour les prioritaires
while not _shutdown:
# Nettoyage des futures terminées pour chaque catégorie
for category, futures_dict in [
("new_bals", futures_new_bals),
("repasses", futures_repass_bals),
("priority", futures_priority_bals),
]:
done = [f for f in list(futures_dict) if f.done()]
for f in done:
email = futures_dict.pop(f)
try:
f.result()
except Exception as e:
log.error(f"[{email}] Erreur non gérée ({category}): {e}")
# Renouvellement du token à chaque cycle de polling
renew_oauth2_token(dry)
# Remplissage des slots disponibles pour les nouvelles BALs
available = NB_CONCURENT_NEW_BAL - len(futures_new_bals)
if available > 0:
# Traitement des nouvelles BALs
rows_new_bals = grist_fetch_new_bals()
rows_new_bals = [
r
for r in rows_new_bals
if r.get(COL_EMAIL, "").strip() not in _running
]
for row in rows_new_bals:
email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry)
futures_new_bals[f] = email
log.info(f"[{email}] Job (new_bals) soumis au pool")
available -= 1
if available <= 0:
break
# Traitement des repasses )
available = NB_CONCURENT_REPASS_BAL - len(futures_repass_bals)
if available > 0:
rows_repasses = grist_fetch_repass_bals()
rows_repasses = [
r
for r in rows_repasses
if r.get(COL_EMAIL, "").strip() not in _running
]
for row in rows_repasses:
email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry)
futures_repass_bals[f] = email
log.info(f"[{email}] Job (repasses) soumis au pool")
available -= 1
if available <= 0:
break
# Ajout des BALs prioritaires
available = NB_CONCURENT_PRIORITY_BAL - len(futures_priority_bals)
if available > 0:
rows = grist_fetch_priority_bals()
rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running]
for row in rows:
email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry)
futures_priority_bals[f] = email
log.info(f"[{email}] Job (priority) soumis au pool")
available -= 1
if available <= 0:
break
# Attente conditionnelle : POLL_INTERVAL ou qu'un thread se termine
if not _shutdown:
all_futures = (
list(futures_new_bals.keys())
+ list(futures_repass_bals.keys())
+ list(futures_priority_bals.keys())
)
if all_futures:
# Attendre soit POLL_INTERVAL, soit qu'au moins un future se termine
wait(
all_futures,
timeout=POLL_INTERVAL,
return_when=FIRST_COMPLETED,
)
else:
# Si aucun future en cours, attendre simplement POLL_INTERVAL
time.sleep(POLL_INTERVAL)
log.info("Daemon arrêté proprement.")
if __name__ == "__main__":
if ARGS.test:
sys.exit(0 if run_tests() else 1)
main()