correction de l'algo de repartition des threads

This commit is contained in:
2026-06-16 01:51:59 +02:00
parent 4df38342f9
commit 672b2546d9
2 changed files with 265 additions and 133 deletions
+5 -5
View File
@@ -17,7 +17,8 @@ COL_EMAIL=Courriel #Addresse a synchoniser
COL_SYNC=Synchronisation #Bool si vrai, synchoniser COL_SYNC=Synchronisation #Bool si vrai, synchoniser
COL_PRIORITY=priority COL_PRIORITY=priority
COL_NB_SYNCS=Nb_syncs #contient le nombre de synchonisation de la BAL COL_NB_SYNCS=Nb_syncs #contient le nombre de synchonisation de la BAL
COL_SUCCESS=Synchonisee_au_moins_une_fois
COL_LAST_PASS=Date_derniere_passe
# Colonnes Grist TABLE_HISTORIQUE # Colonnes Grist TABLE_HISTORIQUE
COL_BAL=Bal COL_BAL=Bal
COL_STATUS=Status COL_STATUS=Status
@@ -43,7 +44,6 @@ IMAPSYNC=/usr/bin/imapsync
TMPDIR_CACHE=/home/migration/imapsync_cache TMPDIR_CACHE=/home/migration/imapsync_cache
# ── Parallélisme et polling ─────────────────────────────────── # ── Parallélisme et polling ───────────────────────────────────
N_WORKERS=4 NB_CONCURENT_NEW_BAL=3
POLL_INTERVAL=30 NB_CONCURENT_REPASS_BAL=3
# Part des slots réservés aux BAL jamais synchronisées (0.0 1.0, défaut 20%) NB_CONCURENT_PRIORITY_BAL=3
NEW_RATIO=0.20
+227 -95
View File
@@ -41,14 +41,13 @@ import http
import http.client import http.client
import json import json
import logging import logging
import math
import os import os
import re import re
import signal import signal
import subprocess import subprocess
import sys import sys
import time import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -114,23 +113,33 @@ OAUTH2_TOKEN_FILE = require_env("OAUTH2_TOKEN_FILE")
PASS2 = require_env("PASS2") PASS2 = require_env("PASS2")
IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync") IMAPSYNC = opt_env("IMAPSYNC", "/usr/bin/imapsync")
IMAPSYNC_TIMEOUT = 72000 IMAPSYNC_TIMEOUT = 72000
NB_RESERVE = 1 # NB_RESERVE = 1
OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap") OAUTH2IMAP = opt_env("OAUTH2IMAP", "/usr/bin/oauth2imap")
TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache") TMPDIR_CACHE = opt_env("TMPDIR_CACHE", "/tmp/imapsync_cache")
N_WORKERS = int(opt_env("N_WORKERS", "4"))
POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30")) POLL_INTERVAL = int(opt_env("POLL_INTERVAL", "30"))
NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
NB_CONCURENT_NEW_BAL = int(opt_env("NB_CONCURENT_NEW_BAL", "3"))
NB_CONCURENT_REPASS_BAL = int(opt_env("NB_CONCURENT_REPASS_BAL", "3"))
NB_CONCURENT_PRIORITY_BAL = int(opt_env("NB_CONCURENT_PRIORITY_BAL", "3"))
NB_MAX_WORKERS = (
NB_CONCURENT_NEW_BAL + NB_CONCURENT_REPASS_BAL + NB_CONCURENT_PRIORITY_BAL
)
# NEW_RATIO = float(opt_env("NEW_RATIO", "0.20"))
# Interval en minute minimum entre 2 vérification d'une meme BAL # Interval en minute minimum entre 2 vérification d'une meme BAL
MIN_INTERVAL_MINUTES = 15 MIN_INTERVAL_MINUTES = 15
TABLE_BALS = require_env("TABLE_BALS")
# Colonnes Grist TABLE_BALS = require_env("TABLE_BALS")
COL_EMAIL = require_env("COL_EMAIL") COL_EMAIL = require_env("COL_EMAIL")
COL_SYNC = require_env("COL_SYNC") COL_SYNC = require_env("COL_SYNC")
COL_NB_SYNCS = require_env("COL_NB_SYNCS") COL_NB_SYNCS = require_env("COL_NB_SYNCS")
COL_PRIORITY = require_env("COL_PRIORITY") COL_PRIORITY = require_env("COL_PRIORITY")
COL_SUCCESS = require_env("COL_SUCCESS")
COL_LAST_PASS = require_env("COL_LAST_PASS")
TABLE_HISTORIQUE = require_env("TABLE_HISTORIQUE") TABLE_HISTORIQUE = require_env("TABLE_HISTORIQUE")
COL_BAL = require_env("COL_BAL") COL_BAL = require_env("COL_BAL")
COL_STATUS = require_env("COL_STATUS") COL_STATUS = require_env("COL_STATUS")
@@ -141,7 +150,6 @@ COL_DATE = require_env("COL_DATE")
TOKEN_VALIDITY = 30 # Minutes TOKEN_VALIDITY = 30 # Minutes
_LAST_RENEWAL_TIME = 0.0 _LAST_RENEWAL_TIME = 0.0
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
# Logging # Logging
@@ -161,7 +169,7 @@ log = logging.getLogger(__name__)
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
# Renouvellement du token OAuth2 # Renouvellement du token OAuth2
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
def renew_oauth2_token() -> bool: def renew_oauth2_token(dry: bool) -> bool:
""" """
Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX Exécute : OAUTH2IMAP --token_file OAUTH2_TOKEN_FILE USER1_PREFIX
Retourne True si le renouvellement a réussi, False sinon. Retourne True si le renouvellement a réussi, False sinon.
@@ -179,6 +187,7 @@ def renew_oauth2_token() -> bool:
cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX] cmd = [OAUTH2IMAP, "--token_file", OAUTH2_TOKEN_FILE, USER1_PREFIX]
log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…") log.info(f"Renouvellement du token OAuth2 ({USER1_PREFIX})…")
if not dry:
try: try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0: if result.returncode == 0:
@@ -200,6 +209,8 @@ def renew_oauth2_token() -> bool:
except Exception as e: except Exception as e:
log.error(f"Erreur inattendue lors du renouvellement du token : {e}") log.error(f"Erreur inattendue lors du renouvellement du token : {e}")
return False return False
else:
return True
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
@@ -207,7 +218,7 @@ def renew_oauth2_token() -> bool:
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
def run_tests() -> bool: def run_tests() -> bool:
http.client.HTTPConnection.debuglevel = 1 http.client.HTTPConnection.debuglevel = 1
print(f" → payload: {grist_priority_bals()}") print(f" → payload: {grist_fetch_new_bals()}")
return True return True
@@ -224,7 +235,7 @@ def grist_records_url(table: str) -> str:
return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records" return f"{GRIST_BASE_URL}/api/docs/{GRIST_DOC_ID}/tables/{table}/records"
def grist_fetch_bals() -> list[dict]: def grist_fetch_new_bals() -> list[dict]:
""" """
Retourne les BAL où Synchronisation = True. Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier). Trié par 'Date_derniere_passe' (le plus vieux en premier).
@@ -232,7 +243,59 @@ def grist_fetch_bals() -> list[dict]:
""" """
try: try:
# 1. Récupération triée via l'API Grist # 1. Récupération triée via l'API Grist
params = {"sort": "Date_derniere_passe"} params = {
# "limit": NB_CONCURENT_NEW_BAL,
"filter": json.dumps(
{COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [False]}
),
"sort": COL_LAST_PASS,
}
resp = requests.get(
grist_records_url(TABLE_BALS),
headers=GRIST_HEADERS,
params=params,
timeout=15,
)
resp.raise_for_status()
records = resp.json().get("records", [])
filtered_bals = []
for r in records:
fields = r["fields"]
# Récupération du timestamp Grist (ex: 1779124323.67)
last_pass_timestamp = fields.get("Date_derniere_passe")
# Si le timestamp existe, on valide l'écart de temps
if last_pass_timestamp is not None:
continue
# Si pas de date (jamais passée)
filtered_bals.append({"id": r["id"], **fields})
return filtered_bals
except requests.RequestException as e:
log.error(f"Erreur lecture Grist BALS : {e}")
return []
def grist_fetch_repass_bals() -> list[dict]:
"""
Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier).
Exclut les BAL dont la dernière passe a eu lieu il y a moins de MIN_INTERVAL_MINUTES.
"""
try:
# 1. Récupération triée via l'API Grist
params = {
# "limit": NB_CONCURENT_REPASS_BAL,
"filter": json.dumps(
{COL_PRIORITY: [False], COL_SYNC: [True], COL_SUCCESS: [True]}
),
"sort": COL_LAST_PASS,
}
resp = requests.get( resp = requests.get(
grist_records_url(TABLE_BALS), grist_records_url(TABLE_BALS),
headers=GRIST_HEADERS, headers=GRIST_HEADERS,
@@ -249,15 +312,12 @@ def grist_fetch_bals() -> list[dict]:
for r in records: for r in records:
fields = r["fields"] fields = r["fields"]
# Vérification de la condition de synchronisation globale
if fields.get(COL_SYNC) is not True:
continue
# Récupération du timestamp Grist (ex: 1779124323.67) # Récupération du timestamp Grist (ex: 1779124323.67)
last_pass_timestamp = fields.get("Date_derniere_passe") last_pass_timestamp = fields.get("Date_derniere_passe")
# Si le timestamp existe, on valide l'écart de temps # Si le timestamp existe, on valide l'écart de temps
if last_pass_timestamp is not None: if last_pass_timestamp is not None:
continue
try: try:
# Calcul du temps écoulé depuis la dernière passe # Calcul du temps écoulé depuis la dernière passe
elapsed_time = current_timestamp - float(last_pass_timestamp) elapsed_time = current_timestamp - float(last_pass_timestamp)
@@ -273,8 +333,10 @@ def grist_fetch_bals() -> list[dict]:
f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. " f"Format de timestamp invalide pour la BAL {r['id']} : {last_pass_timestamp}. "
f"Incluse par sécurité." f"Incluse par sécurité."
) )
else:
continue
# Si pas de date (jamais passée) ou si assez ancienne, on l'ajoute # Si assez ancienne, on l'ajoute
filtered_bals.append({"id": r["id"], **fields}) filtered_bals.append({"id": r["id"], **fields})
return filtered_bals return filtered_bals
@@ -284,7 +346,7 @@ def grist_fetch_bals() -> list[dict]:
return [] return []
def grist_priority_bals() -> dict: def grist_fetch_priority_bals() -> dict:
""" """
Retourne les BAL où Synchronisation = True. Retourne les BAL où Synchronisation = True.
Trié par 'Date_derniere_passe' (le plus vieux en premier). Trié par 'Date_derniere_passe' (le plus vieux en premier).
@@ -294,8 +356,9 @@ def grist_priority_bals() -> dict:
try: try:
# 1. Récupération triée via l'API Grist # 1. Récupération triée via l'API Grist
params = { params = {
"limit": 1, # "limit": NB_CONCURENT_PRIORITY_BAL,
"filter": json.dumps({COL_PRIORITY: [True], COL_SYNC: [True]}), "filter": json.dumps({COL_PRIORITY: [True], COL_SYNC: [True]}),
"sort": COL_LAST_PASS,
} }
resp = requests.get( resp = requests.get(
grist_records_url(TABLE_BALS), grist_records_url(TABLE_BALS),
@@ -366,69 +429,69 @@ def grist_add_historique(
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
# Partition nouvelles BAL / re-passes # Partition nouvelles BAL / re-passes
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
def partition_bals(rows: list[dict], available: int) -> list[dict]: # def partition_bals(rows: list[dict], available: int) -> list[dict]:
""" # """
Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes. # Répartit les slots disponibles (`available`) entre nouvelles BAL et re-passes.
- NB_RESERVE slots sont strictement exclusifs aux re-passes. # - NB_RESERVE slots sont strictement exclusifs aux re-passes.
- Le reste des slots est réparti selon le NEW_RATIO. # - Le reste des slots est réparti selon le NEW_RATIO.
- Si une file est vide, ses slots standards basculent sur l'autre. # - Si une file est vide, ses slots standards basculent sur l'autre.
- La réserve des repasses ne bascule JAMAIS vers les nouvelles. # - La réserve des repasses ne bascule JAMAIS vers les nouvelles.
""" # """
# On borne les slots disponibles par la capacité maximale de notre système # # On borne les slots disponibles par la capacité maximale de notre système
slots_dispos = min(available, N_WORKERS) # slots_dispos = min(available, N_WORKERS)
if slots_dispos <= 0: # if slots_dispos <= 0:
return [] # return []
# 1. Séparation des deux files # # 1. Séparation des deux files
nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)] # nouvelles = [r for r in rows if not r.get(COL_NB_SYNCS)]
repasses = [r for r in rows if r.get(COL_NB_SYNCS)] # repasses = [r for r in rows if r.get(COL_NB_SYNCS)]
# 2. Isolation de la réserve exclusive # # 2. Isolation de la réserve exclusive
# Si on a moins de slots dispos que la réserve demandée, la réserve prend tout # # Si on a moins de slots dispos que la réserve demandée, la réserve prend tout
slots_reserve_exclusive = min(NB_RESERVE, slots_dispos) # slots_reserve_exclusive = min(NB_RESERVE, slots_dispos)
slots_partageables = slots_dispos - slots_reserve_exclusive # slots_partageables = slots_dispos - slots_reserve_exclusive
# 3. Répartition théorique des slots partageables selon le NEW_RATIO # # 3. Répartition théorique des slots partageables selon le NEW_RATIO
quota_new = math.ceil(slots_partageables * NEW_RATIO) # quota_new = math.ceil(slots_partageables * NEW_RATIO)
quota_rep_standard = slots_partageables - quota_new # quota_rep_standard = slots_partageables - quota_new
# Le quota total théorique des repasses inclut sa réserve exclusive # # Le quota total théorique des repasses inclut sa réserve exclusive
quota_rep_total = quota_rep_standard + slots_reserve_exclusive # quota_rep_total = quota_rep_standard + slots_reserve_exclusive
# 4. Premier round de sélection (dans la limite des éléments disponibles) # # 4. Premier round de sélection (dans la limite des éléments disponibles)
picked_new = nouvelles[:quota_new] # picked_new = nouvelles[:quota_new]
picked_rep = repasses[:quota_rep_total] # picked_rep = repasses[:quota_rep_total]
# 5. Gestion des surplus (Bascule asymétrique) # # 5. Gestion des surplus (Bascule asymétrique)
surplus_new = quota_new - len(picked_new) # surplus_new = quota_new - len(picked_new)
# Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve # # Le surplus des repasses ne peut venir QUE de leur quota standard, pas de la réserve
surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep)) # surplus_rep_standard = max(0, quota_rep_standard - len(picked_rep))
if surplus_new > 0: # if surplus_new > 0:
# Les nouvelles n'en veulent plus -> tout le rab va aux repasses # # Les nouvelles n'en veulent plus -> tout le rab va aux repasses
picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new] # picked_rep += repasses[quota_rep_total : quota_rep_total + surplus_new]
if surplus_rep_standard > 0: # if surplus_rep_standard > 0:
# Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles # # Les repasses n'ont pas assez de jobs pour leur quota standard -> le rab va aux nouvelles
picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard] # picked_new += nouvelles[quota_new : quota_new + surplus_rep_standard]
# 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive # # 6. Sécurité : Les nouvelles ne doivent JAMAIS empiéter sur la réserve exclusive
assert len(picked_new) <= slots_partageables # assert len(picked_new) <= slots_partageables
assert len(picked_new) + len(picked_rep) <= slots_dispos # assert len(picked_new) + len(picked_rep) <= slots_dispos
# 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un) # # 7. Intercalage des jobs (Alternative au zip pour garder l'ordre un par un)
result = [] # result = []
i, j = 0, 0 # i, j = 0, 0
while i < len(picked_new) or j < len(picked_rep): # while i < len(picked_new) or j < len(picked_rep):
if i < len(picked_new): # if i < len(picked_new):
result.append(picked_new[i]) # result.append(picked_new[i])
i += 1 # i += 1
if j < len(picked_rep): # if j < len(picked_rep):
result.append(picked_rep[j]) # result.append(picked_rep[j])
j += 1 # j += 1
return result # return result
# ───────────────────────────────────────────── # ─────────────────────────────────────────────
@@ -608,51 +671,120 @@ def main():
dry = ARGS.dry dry = ARGS.dry
log.info("" * 60) log.info("" * 60)
log.info( log.info(
f"imapsync daemon démarré — workers={N_WORKERS} " f"imapsync daemon démarré — workers={NB_MAX_WORKERS} "
f"poll={POLL_INTERVAL}s new_ratio={NEW_RATIO:.0%}" f"poll={POLL_INTERVAL}s " + (" [MODE DRY]" if dry else "")
+ (" [MODE DRY]" if dry else "")
) )
log.info(f"Source : {HOST1} → Destination : {HOST2}") log.info(f"Source : {HOST1} → Destination : {HOST2}")
log.info("" * 60) log.info("" * 60)
if not dry:
Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True) Path(TMPDIR_CACHE).mkdir(parents=True, exist_ok=True)
with ThreadPoolExecutor(max_workers=N_WORKERS) as pool: with ThreadPoolExecutor(max_workers=NB_MAX_WORKERS) as pool:
futures: dict = {} # Dictionnaires pour suivre les futures par catégorie
futures_new_bals: dict = {} # Futures pour les nouvelles BALs
futures_repass_bals: dict = {} # Futures pour les repasses
futures_priority_bals: dict = {} # Futures pour les prioritaires
while not _shutdown: while not _shutdown:
# Nettoyage des futures terminées # Nettoyage des futures terminées pour chaque catégorie
done = [f for f in list(futures) if f.done()] for category, futures_dict in [
("new_bals", futures_new_bals),
("repasses", futures_repass_bals),
("priority", futures_priority_bals),
]:
done = [f for f in list(futures_dict) if f.done()]
for f in done: for f in done:
email = futures.pop(f) email = futures_dict.pop(f)
try: try:
f.result() f.result()
except Exception as e: except Exception as e:
log.error(f"[{email}] Erreur non gérée : {e}") log.error(f"[{email}] Erreur non gérée ({category}): {e}")
# Renouvellement du token à chaque cycle de polling # Renouvellement du token à chaque cycle de polling
renew_oauth2_token() renew_oauth2_token(dry)
# Remplissage des slots disponibles # Remplissage des slots disponibles pour les nouvelles BALs
available = N_WORKERS - len(futures) available = NB_CONCURENT_NEW_BAL - len(futures_new_bals)
if available > 0: if available > 0:
rows = grist_fetch_bals() # Traitement des nouvelles BALs
rows_new_bals = grist_fetch_new_bals()
rows_new_bals = [
r
for r in rows_new_bals
if r.get(COL_EMAIL, "").strip() not in _running
]
for row in rows_new_bals:
email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry)
futures_new_bals[f] = email
log.info(f"[{email}] Job (new_bals) soumis au pool")
available -= 1
if available <= 0:
break
# Traitement des repasses )
available = NB_CONCURENT_REPASS_BAL - len(futures_new_bals)
if available > 0:
rows_repasses = grist_fetch_repass_bals()
rows_repasses = [
r
for r in rows_repasses
if r.get(COL_EMAIL, "").strip() not in _running
]
for row in rows_repasses:
email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry)
futures_repass_bals[f] = email
log.info(f"[{email}] Job (repasses) soumis au pool")
available -= 1
if available <= 0:
break
# Ajout des BALs prioritaires
available = NB_CONCURENT_PRIORITY_BAL - len(futures_new_bals)
if available > 0:
rows = grist_fetch_priority_bals()
rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running] rows = [r for r in rows if r.get(COL_EMAIL, "").strip() not in _running]
for row in partition_bals(rows, available): for row in rows:
email = row.get(COL_EMAIL, "").strip() email = row.get(COL_EMAIL, "").strip()
if (
email not in futures_new_bals
and email not in futures_repass_bals
and email not in futures_priority_bals
):
f = pool.submit(run_sync_job, row, dry) f = pool.submit(run_sync_job, row, dry)
futures[f] = email futures_priority_bals[f] = email
log.info(f"[{email}] Job soumis au pool") log.info(f"[{email}] Job (priority) soumis au pool")
available -= 1
# Ajout de la BAL prioritaire if available <= 0:
row = grist_priority_bals() break
email = row.get(COL_EMAIL, "").strip() # Attente conditionnelle : POLL_INTERVAL ou qu'un thread se termine
if email not in _running and email != "":
f = pool.submit(run_sync_job, row, dry)
futures[f] = email
log.info(f"[{email}] Job soumis au pool")
if not _shutdown: if not _shutdown:
all_futures = (
list(futures_new_bals.keys())
+ list(futures_repass_bals.keys())
+ list(futures_priority_bals.keys())
)
if all_futures:
# Attendre soit POLL_INTERVAL, soit qu'au moins un future se termine
wait(
all_futures,
timeout=POLL_INTERVAL,
return_when=FIRST_COMPLETED,
)
else:
# Si aucun future en cours, attendre simplement POLL_INTERVAL
time.sleep(POLL_INTERVAL) time.sleep(POLL_INTERVAL)
log.info("Daemon arrêté proprement.") log.info("Daemon arrêté proprement.")