import win32com.client from datetime import datetime, timedelta from tqdm import tqdm import time import locale # Imposta la localizzazione in italiano try: locale.setlocale(locale.LC_TIME, "it_IT.UTF-8") except: # noqa: E722 try: locale.setlocale(locale.LC_TIME, "ita_ita") except: # noqa: E722 print("Locale italiano non impostato, uso nomi manuali.") MESI_IT = { 1: "Gennaio", 2: "Febbraio", 3: "Marzo", 4: "Aprile", 5: "Maggio", 6: "Giugno", 7: "Luglio", 8: "Agosto", 9: "Settembre", 10: "Ottobre", 11: "Novembre", 12: "Dicembre" } # Costanti Outlook per le cartelle speciali da escludere FOLDER_DELETED_ITEMS = 3 # Posta eliminata FOLDER_JUNK_EMAIL = 23 # Posta indesiderata FOLDER_OUTBOX = 4 # Posta in uscita (bozze in transito) FOLDER_DRAFTS = 16 # Bozze EXCLUDED_DEFAULT_FOLDERS = {FOLDER_DELETED_ITEMS, FOLDER_JUNK_EMAIL, FOLDER_OUTBOX, FOLDER_DRAFTS} # Per la cartella Posta inviata usiamo SentOn invece di ReceivedTime SENT_ITEMS_FOLDER = 5 # DefaultItemType == 0 → cartella di tipo Mail; altri valori → Calendar, Contacts, Tasks, ecc. OL_MAIL_ITEM = 0 def get_user_email(namespace): """Recupera l'indirizzo email primario dell'utente corrente.""" try: exchange_user = namespace.CurrentUser.AddressEntry.GetExchangeUser() if exchange_user: return exchange_user.PrimarySmtpAddress except Exception: pass # Fallback: legge dallo store predefinito try: return namespace.Stores.Item(1).DisplayName except Exception: pass raise RuntimeError("Impossibile determinare l'email dell'utente corrente.") def find_personal_store(namespace): """ Restituisce lo store della mailbox online (live) dell'utente. Usa GetDefaultFolder(6) = Posta in arrivo, che appartiene sempre alla mailbox primaria online, mai all'archivio. """ try: inbox = namespace.GetDefaultFolder(6) store = inbox.Store print(f" Store personale (da Posta in arrivo): {store.DisplayName}") return store except Exception as e: raise RuntimeError(f"Impossibile determinare lo store della mailbox online: {e}") def get_all_folders(namespace, user_email, archive_root): """ Restituisce una lista di tuple (folder, filter_attr, time_attr) SOLO per la mailbox personale dell'utente, escludendo: - Cartelle non-mail (Calendar, Contacts, Tasks, ecc.) - Posta eliminata, Posta indesiderata, Bozze, Posta in uscita - La cartella archivio online stessa (e tutte le sue sottocartelle) """ # --- Individua lo store personale --- personal_store = find_personal_store(namespace) print(f" Store personale: {personal_store.DisplayName}") # --- EntryID dell'archivio online (passato direttamente da main, certo al 100%) --- try: archive_entry_id = archive_root.EntryID except Exception: archive_entry_id = None # --- EntryID cartelle speciali da escludere (via costanti Outlook) --- excluded_entry_ids = set() if archive_entry_id: excluded_entry_ids.add(archive_entry_id) for ftype in EXCLUDED_DEFAULT_FOLDERS: try: excluded_entry_ids.add(namespace.GetDefaultFolder(ftype).EntryID) except Exception: pass # --- EntryID Posta inviata (usa SentOn) --- try: sent_entry_id = namespace.GetDefaultFolder(SENT_ITEMS_FOLDER).EntryID except Exception: sent_entry_id = None result = [] def walk_folders(folder): entry_id = folder.EntryID # Escludi per EntryID (archivio root, posta eliminata, ecc.) # Nota: escludendo la root dell'archivio, walk_folders non scende mai # nelle sue sottocartelle — non serve controllare i figli separatamente. if entry_id in excluded_entry_ids: return # Escludi cartelle nascoste di sistema tramite proprietà MAPI PR_ATTR_HIDDEN try: pa = folder.PropertyAccessor hidden = pa.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x10F4000B") if hidden: return except Exception: pass # Escludi cartelle non-mail (Calendar, Contacts, Tasks, Notes, ecc.) try: if folder.DefaultItemType != OL_MAIL_ITEM: return except Exception: pass # Attributo temporale corretto if entry_id == sent_entry_id: time_attr = "SentOn" filter_attr = "[SentOn]" else: time_attr = "ReceivedTime" filter_attr = "[ReceivedTime]" result.append((folder, filter_attr, time_attr)) # Ricorsione nelle sottocartelle try: for i in range(1, folder.Folders.Count + 1): walk_folders(folder.Folders.Item(i)) except Exception: pass try: root_folder = personal_store.GetRootFolder() for i in range(1, root_folder.Folders.Count + 1): walk_folders(root_folder.Folders.Item(i)) except Exception as e: print(f"Errore nella lettura delle cartelle: {e}") return result, personal_store # MarkForDownload: 0=scaricato, 1=da scaricare, 2=solo header OL_REMOTE_HEADER = 2 OL_MARKED_FOR_DOWNLOAD = 1 def force_download_folder(folder, namespace, timeout=120): """ Forza il download completo di tutti gli item che sono solo header (cache parziale). 1. Marca tutti gli item header-only come "da scaricare" 2. Triggera SendAndReceive 3. Attende il completamento con timeout """ marked = 0 try: items = folder.Items for i in range(1, items.Count + 1): try: item = items.Item(i) if hasattr(item, "MarkForDownload") and item.MarkForDownload == OL_REMOTE_HEADER: item.MarkForDownload = OL_MARKED_FOR_DOWNLOAD item.Save() marked += 1 except Exception: pass except Exception: pass if marked == 0: return # Tutto già scaricato, niente da fare print(f" ↓ {marked} item da scaricare in '{folder.Name}', avvio sincronizzazione...") # Triggera SendAndReceive asincrono try: namespace.SendAndReceive(False) except Exception: pass # Aspetta che gli item siano tutti scaricati (polling con timeout) deadline = time.time() + timeout with tqdm(total=marked, desc=f" Download '{folder.Name}'", unit="mail", colour="cyan") as pbar: last_remaining = marked while time.time() < deadline: try: remaining = sum( 1 for i in range(1, folder.Items.Count + 1) if hasattr(folder.Items.Item(i), "MarkForDownload") and folder.Items.Item(i).MarkForDownload != 0 ) except Exception: remaining = 0 done = marked - remaining pbar.n = done pbar.refresh() if remaining == 0: break last_remaining = remaining time.sleep(2) else: print(f" ⚠ Timeout: {last_remaining} item non ancora scaricati, si procede comunque.") def get_or_create_folder(parent, name): """Restituisce una sottocartella esistente o la crea.""" try: return parent.Folders.Item(name) except Exception: return parent.Folders.Add(name) def archive_folder(source_folder, filter_attr, time_attr, archive_root, cutoff_date, months_limit, root_entry_id): """Archivia le mail più vecchie di cutoff_date dalla source_folder.""" filter_date_str = cutoff_date.strftime("%d/%m/%Y %H:%M") filter_str = f"{filter_attr} < '{filter_date_str}'" try: items = source_folder.Items.Restrict(filter_str) items.Sort(filter_attr, True) except Exception as e: print(f" Filtro non applicabile su '{source_folder.Name}': {e}") return total_items = items.Count if total_items == 0: print(f" Nessuna mail da archiviare in '{source_folder.Name}'.") return # --- Ricostruisce il percorso relativo rispetto alla root dello store --- # Ci fermiamo esplicitamente quando il parent è la root (EntryID noto), # evitando di includere il nome della mailbox (es. "utente@azienda.com") path_parts = [] f = source_folder while True: try: parent = f.Parent path_parts.insert(0, f.Name) if parent.EntryID == root_entry_id: break f = parent except Exception: break archived_count = 0 desc_label = "/".join(path_parts) if path_parts else source_folder.Name with tqdm(total=total_items, desc=f"Archiviazione '{desc_label}'", unit="mail", colour="green") as pbar: for i in range(total_items, 0, -1): try: item = items.Item(i) if not hasattr(item, time_attr): pbar.update(1) continue rt = getattr(item, time_attr) received_time = datetime(rt.year, rt.month, rt.day, rt.hour, rt.minute) anno_str = str(received_time.year) nome_mese = MESI_IT[received_time.month] month_folder_name = f"{received_time.month:02d}-{nome_mese}" pbar.set_description(f"'{desc_label}' - {nome_mese} {anno_str}") # --- Costruisce la struttura nell'archivio --- # Archivio → percorso relativo sorgente → Anno → Mese target = archive_root for part in path_parts: target = get_or_create_folder(target, part) target = get_or_create_folder(target, anno_str) target = get_or_create_folder(target, month_folder_name) # Sposta con retry archived_item = None for tentativo in range(3): try: archived_item = item.Move(target) if archived_item: archived_item.Save() time.sleep(0.4) break except Exception: time.sleep(1) if archived_item: archived_count += 1 pbar.set_postfix(successo=archived_count) else: pbar.set_postfix(error="Move fallito") except Exception as e: pbar.set_postfix(err=str(e)[:20]) pbar.update(1) print(f" → {archived_count}/{total_items} mail archiviate da '{desc_label}'.") def main(): print("Connessione a Outlook in corso...") try: outlook = win32com.client.Dispatch("Outlook.Application") namespace = outlook.GetNamespace("MAPI") except Exception as e: print(f"Errore connessione: {e}") return # --- MODIFICA 1: ricava ARCHIVE_NAME dall'email dell'utente --- try: user_email = get_user_email(namespace) ARCHIVE_NAME = f"Archivio online - {user_email}" print(f"Utente rilevato: {user_email}") print(f"Archivio target: {ARCHIVE_NAME}") except RuntimeError as e: print(f"Errore: {e}") return try: archive_root = namespace.Folders.Item(ARCHIVE_NAME) except Exception: print(f"Cartella archivio '{ARCHIVE_NAME}' non trovata in Outlook.") return while True: stringa_months_limit = input("\nQuanti mesi vuoi tenere in linea? (default 3): ").strip() or "3" try: MONTHS_LIMIT = int(stringa_months_limit) if MONTHS_LIMIT > 0: break print(" Inserisci un numero intero maggiore di zero.") except ValueError: print(f" '{stringa_months_limit}' non è un numero valido. Riprova.") cutoff_date = datetime.now() - timedelta(days=MONTHS_LIMIT * 30) print(f"\nVerranno archiviate le mail precedenti al: {cutoff_date.strftime('%d/%m/%Y')}\n") # --- MODIFICA 2: recupera TUTTE le cartelle della mailbox --- folders_to_process, personal_store = get_all_folders(namespace, user_email, archive_root) print(f"Cartelle trovate da elaborare: {len(folders_to_process)}") for f, _, _ in folders_to_process: print(f" • {f.Name}") confirm = input("\nProcedere con l'archiviazione? (s/n): ").strip().lower() if confirm != "s": print("Operazione annullata.") return root_entry_id = personal_store.GetRootFolder().EntryID for source_folder, filter_attr, time_attr in folders_to_process: print(f"\n--- Cartella: {source_folder.Name} ---") force_download_folder(source_folder, namespace) archive_folder(source_folder, filter_attr, time_attr, archive_root, cutoff_date, MONTHS_LIMIT, root_entry_id) print("\nOperazione conclusa con successo.") if __name__ == "__main__": main()