From 672b2546d96a08614c4e5c4770d423c7dfbc2622 Mon Sep 17 00:00:00 2001 From: MAURA Mathieu Date: Tue, 16 Jun 2026 01:51:59 +0200 Subject: [PATCH] correction de l'algo de repartition des threads --- .env.example | 24 +-- imapsync_daemon.py | 374 ++++++++++++++++++++++++++++++--------------- 2 files changed, 265 insertions(+), 133 deletions(-) diff --git a/.env.example b/.env.example index f1581da..87b49b5 100644 --- a/.env.example +++ b/.env.example @@ -17,14 +17,15 @@ COL_EMAIL=Courriel #Addresse a synchoniser COL_SYNC=Synchronisation #Bool si vrai, synchoniser COL_PRIORITY=priority COL_NB_SYNCS=Nb_syncs #contient le nombre de synchonisation de la BAL - - # Colonnes Grist TABLE_HISTORIQUE - COL_BAL=Bal - COL_STATUS=Status - COL_NB_EMAILS=Nb_emails - COL_DUREE_SEC=Duree_sec - COL_LOG=Log - COL_DATE=Date +COL_SUCCESS=Synchonisee_au_moins_une_fois +COL_LAST_PASS=Date_derniere_passe +# Colonnes Grist TABLE_HISTORIQUE +COL_BAL=Bal +COL_STATUS=Status +COL_NB_EMAILS=Nb_emails +COL_DUREE_SEC=Duree_sec +COL_LOG=Log +COL_DATE=Date # ── Serveurs IMAP ───────────────────────────────────────────── HOST1=imap.source.fr @@ -43,7 +44,6 @@ IMAPSYNC=/usr/bin/imapsync TMPDIR_CACHE=/home/migration/imapsync_cache # ── Parallélisme et polling ─────────────────────────────────── -N_WORKERS=4 -POLL_INTERVAL=30 -# Part des slots réservés aux BAL jamais synchronisées (0.0 – 1.0, défaut 20%) -NEW_RATIO=0.20 +NB_CONCURENT_NEW_BAL=3 +NB_CONCURENT_REPASS_BAL=3 +NB_CONCURENT_PRIORITY_BAL=3 diff --git a/imapsync_daemon.py b/imapsync_daemon.py index 48bd22c..89254a2 100755 --- a/imapsync_daemon.py +++ b/imapsync_daemon.py @@ -41,14 +41,13 @@ import http import http.client import json import logging -import math import os import re import signal import subprocess import sys import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait from datetime import datetime, timezone from pathlib import Path @@ -114,23 +113,33 @@ OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE") PASS2 = require_env("PASS2") IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync") IMAPSYNC_TIMEOUT = 72000 -NB_RESERVE = 1 +# NB_RESERVE = 1 OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap") TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache") -N_WORKERS = int(opt_env("N_WORKERS", "4")) POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30")) -NEW_RATIO = float(opt_env("NEW_RATIO", "0.20")) + +NB_CONCURENT_NEW_BAL = int(opt_env("NB_CONCURENT_NEW_BAL", "3")) +NB_CONCURENT_REPASS_BAL = int(opt_env("NB_CONCURENT_REPASS_BAL", "3")) +NB_CONCURENT_PRIORITY_BAL = int(opt_env("NB_CONCURENT_PRIORITY_BAL", "3")) + +NB_MAX_WORKERS = ( + NB_CONCURENT_NEW_BAL + NB_CONCURENT_REPASS_BAL + NB_CONCURENT_PRIORITY_BAL +) + +# NEW_RATIO = float(opt_env("NEW_RATIO", "0.20")) # Interval en minute minimum entre 2 vérification d'une meme BAL MIN_INTERVAL_MINUTES = 15 -TABLE_BALS = require_env("TABLE_BALS") -# Colonnes Grist +TABLE_BALS = require_env("TABLE_BALS") COL_EMAIL = require_env("COL_EMAIL") COL_SYNC = require_env("COL_SYNC") COL_NB_SYNCS = require_env("COL_NB_SYNCS") COL_PRIORITY = require_env("COL_PRIORITY") +COL_SUCCESS = require_env("COL_SUCCESS") +COL_LAST_PASS = require_env("COL_LAST_PASS") + TABLE_HISTORIQUE = require_env("TABLE_HISTORIQUE") COL_BAL = require_env("COL_BAL") COL_STATUS = require_env("COL_STATUS") @@ -141,7 +150,6 @@ COL_DATE = require_env("COL_DATE") TOKEN_VALIDITY = 30 # Minutes - _LAST_RENEWAL_TIME = 0.0 # ───────────────────────────────────────────── # Logging @@ -161,7 +169,7 @@ log = logging.getLogger(__name__) # ───────────────────────────────────────────── # Renouvellement du token OAuth2 # ───────────────────────────────────────────── -def renew_oauth2_token() -> bool: +def renew_oauth2_token(dry: bool) -> bool: """ Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX Retourne True si le renouvellement a réussi, False sinon. @@ -179,27 +187,30 @@ def renew_oauth2_token() -> bool: cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX] log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…") - try: - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - if result.returncode == 0: - log.info("Token OAuth2 renouvelé avec succès.") - _LAST_RENEWAL_TIME = current_time - return True - else: - log.error( - f"Échec du renouvellement du token (code {result.returncode}) : " - f"{(result.stderr or result.stdout).strip()[:300]}" - ) + if not dry: + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) + if result.returncode == 0: + log.info("Token OAuth2 renouvelé avec succès.") + _LAST_RENEWAL_TIME = current_time + return True + else: + log.error( + f"Échec du renouvellement du token (code {result.returncode}) : " + f"{(result.stderr or result.stdout).strip()[:300]}" + ) + return False + except subprocess.TimeoutExpired: + log.error("Timeout lors du renouvellement du token OAuth2 (>60s).") return False - except subprocess.TimeoutExpired: - log.error("Timeout lors du renouvellement du token OAuth2 (>60s).") - return False - except FileNotFoundError: - log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}") - return False - except Exception as e: - log.error(f"Erreur inattendue lors du renouvellement du token : {e}") - return False + except FileNotFoundError: + log.error(f"Exécutable oauth2imap introuvable : {OAUTH2IMAP}") + return False + except Exception as e: + log.error(f"Erreur inattendue lors du renouvellement du token : {e}") + return False + else: + return True # ───────────────────────────────────────────── @@ -207,7 +218,7 @@ def renew_oauth2_token() -> bool: # ───────────────────────────────────────────── def run_tests() -> bool: http.client.HTTPConnection.debuglevel = 1 - print(f" → payload: {grist_priority_bals()}") + print(f" → payload: {grist_fetch_new_bals()}") return True @@ -224,7 +235,7 @@ def grist_records_url(table: str) -> str: return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records" -def grist_fetch_bals() -> list[dict]: +def grist_fetch_new_bals() -> list[dict]: """ Retourne les BAL où Synchronisation = True. Trié par 'Date_derniere_passe' (le plus vieux en premier). @@ -232,7 +243,59 @@ def grist_fetch_bals() -> list[dict]: """ try: # 1. Récupération triée via l'API Grist - params = {"sort": "Date_derniere_passe"} + params = { + # "limit": NB_CONCURENT_NEW_BAL, + "filter": json.dumps( + {COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [False]} + ), + "sort": COL_LAST_PASS, + } + resp = requests.get( + grist_records_url(TABLE_BALS), + headers=GRIST_HEADERS, + params=params, + timeout=15, + ) + resp.raise_for_status() + records = resp.json().get("records", []) + + filtered_bals = [] + + for r in records: + fields = r["fields"] + + # Récupération du timestamp Grist (ex: 1779124323.67) + last_pass_timestamp = fields.get("Date_derniere_passe") + + # Si le timestamp existe, on valide l'écart de temps + if last_pass_timestamp is not None: + continue + + # Si pas de date (jamais passée) + filtered_bals.append({"id": r["id"], **fields}) + + return filtered_bals + + except requests.RequestException as e: + log.error(f"Erreur lecture Grist BALS : {e}") + return [] + + +def grist_fetch_repass_bals() -> list[dict]: + """ + Retourne les BAL où Synchronisation = True. + Trié par 'Date_derniere_passe' (le plus vieux en premier). + Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES. + """ + try: + # 1. Récupération triée via l'API Grist + params = { + # "limit": NB_CONCURENT_REPASS_BAL, + "filter": json.dumps( + {COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [True]} + ), + "sort": COL_LAST_PASS, + } resp = requests.get( grist_records_url(TABLE_BALS), headers=GRIST_HEADERS, @@ -249,15 +312,12 @@ def grist_fetch_bals() -> list[dict]: for r in records: fields = r["fields"] - # Vérification de la condition de synchronisation globale - if fields.get(COL_SYNC) is not True: - continue - # Récupération du timestamp Grist (ex: 1779124323.67) last_pass_timestamp = fields.get("Date_derniere_passe") # Si le timestamp existe, on valide l'écart de temps if last_pass_timestamp is not None: + continue try: # Calcul du temps écoulé depuis la dernière passe elapsed_time = current_timestamp - float(last_pass_timestamp) @@ -273,8 +333,10 @@ def grist_fetch_bals() -> list[dict]: f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. " f"Incluse par sécurité." ) + else: + continue - # Si pas de date (jamais passée) ou si assez ancienne, on l'ajoute + # Si assez ancienne, on l'ajoute filtered_bals.append({"id": r["id"], **fields}) return filtered_bals @@ -284,7 +346,7 @@ def grist_fetch_bals() -> list[dict]: return [] -def grist_priority_bals() -> dict: +def grist_fetch_priority_bals() -> dict: """ Retourne les BAL où Synchronisation = True. Trié par 'Date_derniere_passe' (le plus vieux en premier). @@ -294,8 +356,9 @@ def grist_priority_bals() -> dict: try: # 1. Récupération triée via l'API Grist params = { - "limit": 1, + # "limit": NB_CONCURENT_PRIORITY_BAL, "filter": json.dumps({COL_PRIORITY: [True], COL_SYNC: [True]}), + "sort": COL_LAST_PASS, } resp = requests.get( grist_records_url(TABLE_BALS), @@ -366,69 +429,69 @@ def grist_add_historique( # ───────────────────────────────────────────── # Partition nouvelles BAL / re-passes # ───────────────────────────────────────────── -def partition_bals(rows: list[dict], available: int) -> list[dict]: - """ - Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes. +# def partition_bals(rows: list[dict], available: int) -> list[dict]: +# """ +# Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes. - - NB_RESERVE slots sont strictement exclusifs aux re-passes. - - Le reste des slots est réparti selon le NEW_RATIO. - - Si une file est vide, ses slots standards basculent sur l'autre. - - La réserve des repasses ne bascule JAMAIS vers les nouvelles. - """ - # On borne les slots disponibles par la capacité maximale de notre système - slots_dispos = min(available, N_WORKERS) - if slots_dispos <= 0: - return [] +# - NB_RESERVE slots sont strictement exclusifs aux re-passes. +# - Le reste des slots est réparti selon le NEW_RATIO. +# - Si une file est vide, ses slots standards basculent sur l'autre. +# - La réserve des repasses ne bascule JAMAIS vers les nouvelles. +# """ +# # On borne les slots disponibles par la capacité maximale de notre système +# slots_dispos = min(available, N_WORKERS) +# if slots_dispos <= 0: +# return [] - # 1. Séparation des deux files - nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)] - repasses = [r for r in rows if r.get(COL_NB_SYNCS)] +# # 1. Séparation des deux files +# nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)] +# repasses = [r for r in rows if r.get(COL_NB_SYNCS)] - # 2. Isolation de la réserve exclusive - # Si on a moins de slots dispos que la réserve demandée, la réserve prend tout - slots_reserve_exclusive = min(NB_RESERVE, slots_dispos) - slots_partageables = slots_dispos - slots_reserve_exclusive +# # 2. Isolation de la réserve exclusive +# # Si on a moins de slots dispos que la réserve demandée, la réserve prend tout +# slots_reserve_exclusive = min(NB_RESERVE, slots_dispos) +# slots_partageables = slots_dispos - slots_reserve_exclusive - # 3. Répartition théorique des slots partageables selon le NEW_RATIO - quota_new = math.ceil(slots_partageables * NEW_RATIO) - quota_rep_standard = slots_partageables - quota_new +# # 3. Répartition théorique des slots partageables selon le NEW_RATIO +# quota_new = math.ceil(slots_partageables * NEW_RATIO) +# quota_rep_standard = slots_partageables - quota_new - # Le quota total théorique des repasses inclut sa réserve exclusive - quota_rep_total = quota_rep_standard + slots_reserve_exclusive +# # Le quota total théorique des repasses inclut sa réserve exclusive +# quota_rep_total = quota_rep_standard + slots_reserve_exclusive - # 4. Premier round de sélection (dans la limite des éléments disponibles) - picked_new = nouvelles[:quota_new] - picked_rep = repasses[:quota_rep_total] +# # 4. Premier round de sélection (dans la limite des éléments disponibles) +# picked_new = nouvelles[:quota_new] +# picked_rep = repasses[:quota_rep_total] - # 5. Gestion des surplus (Bascule asymétrique) - surplus_new = quota_new - len(picked_new) - # Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve - surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep)) +# # 5. Gestion des surplus (Bascule asymétrique) +# surplus_new = quota_new - len(picked_new) +# # Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve +# surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep)) - if surplus_new > 0: - # Les nouvelles n'en veulent plus -> tout le rab va aux repasses - picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new] +# if surplus_new > 0: +# # Les nouvelles n'en veulent plus -> tout le rab va aux repasses +# picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new] - if surplus_rep_standard > 0: - # Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles - picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard] +# if surplus_rep_standard > 0: +# # Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles +# picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard] - # 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive - assert len(picked_new) <= slots_partageables - assert len(picked_new) + len(picked_rep) <= slots_dispos +# # 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive +# assert len(picked_new) <= slots_partageables +# assert len(picked_new) + len(picked_rep) <= slots_dispos - # 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un) - result = [] - i, j = 0, 0 - while i < len(picked_new) or j < len(picked_rep): - if i < len(picked_new): - result.append(picked_new[i]) - i += 1 - if j < len(picked_rep): - result.append(picked_rep[j]) - j += 1 +# # 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un) +# result = [] +# i, j = 0, 0 +# while i < len(picked_new) or j < len(picked_rep): +# if i < len(picked_new): +# result.append(picked_new[i]) +# i += 1 +# if j < len(picked_rep): +# result.append(picked_rep[j]) +# j += 1 - return result +# return result # ───────────────────────────────────────────── @@ -608,52 +671,121 @@ def main(): dry = ARGS.dry log.info("═" * 60) log.info( - f"imapsync daemon démarré — workers={N_WORKERS} " - f"poll={POLL_INTERVAL}s new_ratio={NEW_RATIO:.0%}" - + (" [MODE DRY]" if dry else "") + f"imapsync daemon démarré — workers={NB_MAX_WORKERS} " + f"poll={POLL_INTERVAL}s " + (" [MODE DRY]" if dry else "") ) log.info(f"Source : {HOST1} → Destination : {HOST2}") log.info("═" * 60) - Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True) + if not dry: + Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True) - with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: - futures: dict = {} + with ThreadPoolExecutor(max_workers=NB_MAX_WORKERS) as pool: + # Dictionnaires pour suivre les futures par catégorie + futures_new_bals: dict = {} # Futures pour les nouvelles BALs + futures_repass_bals: dict = {} # Futures pour les repasses + futures_priority_bals: dict = {} # Futures pour les prioritaires while not _shutdown: - # Nettoyage des futures terminées - done = [f for f in list(futures) if f.done()] - for f in done: - email = futures.pop(f) - try: - f.result() - except Exception as e: - log.error(f"[{email}] Erreur non gérée : {e}") + # Nettoyage des futures terminées pour chaque catégorie + for category, futures_dict in [ + ("new_bals", futures_new_bals), + ("repasses", futures_repass_bals), + ("priority", futures_priority_bals), + ]: + done = [f for f in list(futures_dict) if f.done()] + for f in done: + email = futures_dict.pop(f) + try: + f.result() + except Exception as e: + log.error(f"[{email}] Erreur non gérée ({category}): {e}") # Renouvellement du token à chaque cycle de polling - renew_oauth2_token() + renew_oauth2_token(dry) - # Remplissage des slots disponibles - available = N_WORKERS - len(futures) + # Remplissage des slots disponibles pour les nouvelles BALs + available = NB_CONCURENT_NEW_BAL - len(futures_new_bals) if available > 0: - rows = grist_fetch_bals() - rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running] - for row in partition_bals(rows, available): + # Traitement des nouvelles BALs + rows_new_bals = grist_fetch_new_bals() + rows_new_bals = [ + r + for r in rows_new_bals + if r.get(COL_EMAIL, "").strip() not in _running + ] + for row in rows_new_bals: email = row.get(COL_EMAIL, "").strip() - f = pool.submit(run_sync_job, row, dry) - futures[f] = email - log.info(f"[{email}] Job soumis au pool") + if ( + email not in futures_new_bals + and email not in futures_repass_bals + and email not in futures_priority_bals + ): + f = pool.submit(run_sync_job, row, dry) + futures_new_bals[f] = email + log.info(f"[{email}] Job (new_bals) soumis au pool") + available -= 1 + if available <= 0: + break - # Ajout de la BAL prioritaire - row = grist_priority_bals() - email = row.get(COL_EMAIL, "").strip() - if email not in _running and email != "": - f = pool.submit(run_sync_job, row, dry) - futures[f] = email - log.info(f"[{email}] Job soumis au pool") + # Traitement des repasses ) + available = NB_CONCURENT_REPASS_BAL - len(futures_new_bals) + if available > 0: + rows_repasses = grist_fetch_repass_bals() + rows_repasses = [ + r + for r in rows_repasses + if r.get(COL_EMAIL, "").strip() not in _running + ] + for row in rows_repasses: + email = row.get(COL_EMAIL, "").strip() + if ( + email not in futures_new_bals + and email not in futures_repass_bals + and email not in futures_priority_bals + ): + f = pool.submit(run_sync_job, row, dry) + futures_repass_bals[f] = email + log.info(f"[{email}] Job (repasses) soumis au pool") + available -= 1 + if available <= 0: + break + # Ajout des BALs prioritaires + available = NB_CONCURENT_PRIORITY_BAL - len(futures_new_bals) + if available > 0: + rows = grist_fetch_priority_bals() + rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running] + for row in rows: + email = row.get(COL_EMAIL, "").strip() + if ( + email not in futures_new_bals + and email not in futures_repass_bals + and email not in futures_priority_bals + ): + f = pool.submit(run_sync_job, row, dry) + futures_priority_bals[f] = email + log.info(f"[{email}] Job (priority) soumis au pool") + available -= 1 + if available <= 0: + break + # Attente conditionnelle : POLL_INTERVAL ou qu'un thread se termine if not _shutdown: - time.sleep(POLL_INTERVAL) + all_futures = ( + list(futures_new_bals.keys()) + + list(futures_repass_bals.keys()) + + list(futures_priority_bals.keys()) + ) + if all_futures: + # Attendre soit POLL_INTERVAL, soit qu'au moins un future se termine + wait( + all_futures, + timeout=POLL_INTERVAL, + return_when=FIRST_COMPLETED, + ) + else: + # Si aucun future en cours, attendre simplement POLL_INTERVAL + time.sleep(POLL_INTERVAL) log.info("Daemon arrêté proprement.")