diff --git a/tag_imap_categories.py b/tag_imap_categories.py new file mode 100644 index 0000000..e30cb16 --- /dev/null +++ b/tag_imap_categories.py @@ -0,0 +1,195 @@ +import email +import itertools +from datetime import datetime +from imaplib import IMAP4_SSL + +import httpx + +MAIL_SERVER = "imap.numerique.gouv.fr" +OX_SERVER = "messagerie.numerique.gouv.fr" + +# Authentication +# via force password +# client post mailboxes force_password sdis66.fr eric.belgioino jnhskjdfnskflfksdlfks +EMAIL_USER = "eric.belgioino@sdis66.fr" +EMAIL_PASS = "jnhskjdfnskflfksdlfks" + +# via app_passwords +API_USER = "41e0d345-5c18-4c7b-8e79-42884eda45c6" +API_PASS = "ddkv-rciv-cwjr-jdkk" + + +def ox_client(): + client = httpx.Client(base_url=f"https://{OX_SERVER}/") + data = { + "name": API_USER, + "password": API_PASS, + "authId": f"{datetime.now().isoformat()}", + "client": "ox-python", + "clientToken": "ox-python", + } + response = client.post("/appsuite/api/login?action=login", data=data) + return client, response.json() + + +def get_jslob(client, session): + jslob = client.get(f"/appsuite/api/jslob?action=all&session={session['session']}") + return jslob.json()["data"] + + +def get_ox_categories(jslob): + for d in jslob: + if d["id"] != "io.ox/core": + continue + if "categories" not in d["tree"]: + continue + return d["tree"]["categories"] + + +def get_ox_identifier(jslob): + for d in jslob: + if d["id"] != "io.ox/core": + continue + if "identifier" not in d["tree"]: + continue + return d["tree"]["identifier"] + + +def get_new_categories(categories, identifier, keywords): + new_user_categories = [] + serialNumber = categories["serialNumber"] + colors = get_color() + for keyword in keywords: + if keyword not in [c["name"] for c in categories["userCategories"]]: + serialNumber += 1 + new_id = f"$ct_user_{str(serialNumber).zfill(4)}_{identifier}" + new_user_category = { + "id": new_id, + "name": keyword, + "color": next(colors), + "icon": "bi/star.svg", + } + new_user_categories.append(new_user_category) + new_categories = { + "categories": { + "userCategories": categories["userCategories"] + new_user_categories, + "serialNumber": serialNumber, + } + } + return new_categories + + +def put_new_categories(client, session, new_categories): + print(new_categories) + response = client.put( + f"/appsuite/api/jslob?action=update&session={session['session']}&id=io.ox/core", + json=new_categories, + ) + return response + + +def get_color(): + colors = itertools.cycle( + [ + "#ff2968", + "#ff9500", + "#ffcc00", + "#63da38", + "#16adf8", + "#d6dfff", + "#cc73e1", + "#a2845e", + "#707070", + ] + ) + for color in colors: + yield color + + +def get_keywords(): + with IMAP4_SSL("imap.numerique.gouv.fr") as imap: + imap.login(EMAIL_USER, EMAIL_PASS) + imap.select("Virtual/All") + status, message_ids = imap.search(None, '(HEADER Keywords "")') + + ids = message_ids[0].split() + ids_query = ",".join([f"{int(a)}" for a in ids]) + status, headers = imap.fetch(ids_query, "(BODY.PEEK[HEADER])") + keyword_store = {} + count = 0 + # ici on retrouve des messages avec juste une parenthèse fermante, et c'est moche, donc on le retire. + headers = [h for h in headers if h != b")"] + for header in headers: + try: + raw_headers = header[1] + msg = email.message_from_bytes(raw_headers) + keywords = set(msg["Keywords"].split(",")) + for keyword in keywords: + keyword_store[keyword] = keyword_store.get(keyword, []) + [ + ids[count] + ] + except Exception as e: + print(f"#### msg {ids[count]}") + print(f"#### cassé !: {e}") + finally: + count += 1 + imap.logout() + return keyword_store + + +def associations(categories, keywords): + assoc = {} + for keyword in keywords: + for category in categories["categories"]["userCategories"]: + if keyword == category["name"] and category["id"][0:3] == "$ct": + assoc[keyword] = category["id"] + return assoc + + +def patch_emails(keywords, categories): + print(categories) + tags_for_category = associations(categories, keywords) + + with IMAP4_SSL("imap.numerique.gouv.fr") as imap: + imap.login(EMAIL_USER, EMAIL_PASS) + imap.select("Virtual/All") + for keyword in keywords.keys(): + tag = tags_for_category[keyword] + ids = keywords[keyword] + ids_str = ",".join([i.decode("utf-8") for i in ids]) + # Appliquer le tag OX aux mails + imap.store(ids_str, "+FLAGS", tag) + + pass + + +def main(): + # get keywords + print("\t => get keywords :") + keywords = get_keywords() + print(f"{keywords}") + + # get ox categories + print("\t => get ox categories") + client, session = ox_client() + jslob = get_jslob(client, session) + categories = get_ox_categories(jslob) + identifier = get_ox_identifier(jslob) + print(f"{categories}") + + patch_emails(keywords, categories) + + # create new categories + # print("create new ox categories") + # new_categories = get_new_categories(categories, identifier, keywords.keys()) + # response = put_new_categories(client, session, new_categories) + # patch emails + # tag_for_category = associations(new_categories, keywords.keys()) + # patch_emails(keywords, new_categories) + + # say goodbye + print("goodbye") + + +if __name__ == "__main__": + main()