Files
ArchiveMail/Archive_all_email.py
2026-03-06 10:21:41 +01:00

375 lines
13 KiB
Python

import win32com.client
from datetime import datetime, timedelta
from tqdm import tqdm
import time
import locale
# Imposta la localizzazione in italiano
try:
locale.setlocale(locale.LC_TIME, "it_IT.UTF-8")
except: # noqa: E722
try:
locale.setlocale(locale.LC_TIME, "ita_ita")
except: # noqa: E722
print("Locale italiano non impostato, uso nomi manuali.")
MESI_IT = {
1: "Gennaio", 2: "Febbraio", 3: "Marzo", 4: "Aprile",
5: "Maggio", 6: "Giugno", 7: "Luglio", 8: "Agosto",
9: "Settembre", 10: "Ottobre", 11: "Novembre", 12: "Dicembre"
}
# Costanti Outlook per le cartelle speciali da escludere
FOLDER_DELETED_ITEMS = 3 # Posta eliminata
FOLDER_JUNK_EMAIL = 23 # Posta indesiderata
FOLDER_OUTBOX = 4 # Posta in uscita (bozze in transito)
FOLDER_DRAFTS = 16 # Bozze
EXCLUDED_DEFAULT_FOLDERS = {FOLDER_DELETED_ITEMS, FOLDER_JUNK_EMAIL, FOLDER_OUTBOX, FOLDER_DRAFTS}
# Per la cartella Posta inviata usiamo SentOn invece di ReceivedTime
SENT_ITEMS_FOLDER = 5
# DefaultItemType == 0 → cartella di tipo Mail; altri valori → Calendar, Contacts, Tasks, ecc.
OL_MAIL_ITEM = 0
def get_user_email(namespace):
"""Recupera l'indirizzo email primario dell'utente corrente."""
try:
exchange_user = namespace.CurrentUser.AddressEntry.GetExchangeUser()
if exchange_user:
return exchange_user.PrimarySmtpAddress
except Exception:
pass
# Fallback: legge dallo store predefinito
try:
return namespace.Stores.Item(1).DisplayName
except Exception:
pass
raise RuntimeError("Impossibile determinare l'email dell'utente corrente.")
def find_personal_store(namespace):
"""
Restituisce lo store della mailbox online (live) dell'utente.
Usa GetDefaultFolder(6) = Posta in arrivo, che appartiene sempre
alla mailbox primaria online, mai all'archivio.
"""
try:
inbox = namespace.GetDefaultFolder(6)
store = inbox.Store
print(f" Store personale (da Posta in arrivo): {store.DisplayName}")
return store
except Exception as e:
raise RuntimeError(f"Impossibile determinare lo store della mailbox online: {e}")
def get_all_folders(namespace, user_email, archive_root):
"""
Restituisce una lista di tuple (folder, filter_attr, time_attr)
SOLO per la mailbox personale dell'utente, escludendo:
- Cartelle non-mail (Calendar, Contacts, Tasks, ecc.)
- Posta eliminata, Posta indesiderata, Bozze, Posta in uscita
- La cartella archivio online stessa (e tutte le sue sottocartelle)
"""
# --- Individua lo store personale ---
personal_store = find_personal_store(namespace)
print(f" Store personale: {personal_store.DisplayName}")
# --- EntryID dell'archivio online (passato direttamente da main, certo al 100%) ---
try:
archive_entry_id = archive_root.EntryID
except Exception:
archive_entry_id = None
# --- EntryID cartelle speciali da escludere (via costanti Outlook) ---
excluded_entry_ids = set()
if archive_entry_id:
excluded_entry_ids.add(archive_entry_id)
for ftype in EXCLUDED_DEFAULT_FOLDERS:
try:
excluded_entry_ids.add(namespace.GetDefaultFolder(ftype).EntryID)
except Exception:
pass
# --- EntryID Posta inviata (usa SentOn) ---
try:
sent_entry_id = namespace.GetDefaultFolder(SENT_ITEMS_FOLDER).EntryID
except Exception:
sent_entry_id = None
result = []
def walk_folders(folder):
entry_id = folder.EntryID
# Escludi per EntryID (archivio root, posta eliminata, ecc.)
# Nota: escludendo la root dell'archivio, walk_folders non scende mai
# nelle sue sottocartelle — non serve controllare i figli separatamente.
if entry_id in excluded_entry_ids:
return
# Escludi cartelle nascoste di sistema tramite proprietà MAPI PR_ATTR_HIDDEN
try:
pa = folder.PropertyAccessor
hidden = pa.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x10F4000B")
if hidden:
return
except Exception:
pass
# Escludi cartelle non-mail (Calendar, Contacts, Tasks, Notes, ecc.)
try:
if folder.DefaultItemType != OL_MAIL_ITEM:
return
except Exception:
pass
# Attributo temporale corretto
if entry_id == sent_entry_id:
time_attr = "SentOn"
filter_attr = "[SentOn]"
else:
time_attr = "ReceivedTime"
filter_attr = "[ReceivedTime]"
result.append((folder, filter_attr, time_attr))
# Ricorsione nelle sottocartelle
try:
for i in range(1, folder.Folders.Count + 1):
walk_folders(folder.Folders.Item(i))
except Exception:
pass
try:
root_folder = personal_store.GetRootFolder()
for i in range(1, root_folder.Folders.Count + 1):
walk_folders(root_folder.Folders.Item(i))
except Exception as e:
print(f"Errore nella lettura delle cartelle: {e}")
return result, personal_store
# MarkForDownload: 0=scaricato, 1=da scaricare, 2=solo header
OL_REMOTE_HEADER = 2
OL_MARKED_FOR_DOWNLOAD = 1
def force_download_folder(folder, namespace, timeout=120):
"""
Forza il download completo di tutti gli item che sono solo header (cache parziale).
1. Marca tutti gli item header-only come "da scaricare"
2. Triggera SendAndReceive
3. Attende il completamento con timeout
"""
marked = 0
try:
items = folder.Items
for i in range(1, items.Count + 1):
try:
item = items.Item(i)
if hasattr(item, "MarkForDownload") and item.MarkForDownload == OL_REMOTE_HEADER:
item.MarkForDownload = OL_MARKED_FOR_DOWNLOAD
item.Save()
marked += 1
except Exception:
pass
except Exception:
pass
if marked == 0:
return # Tutto già scaricato, niente da fare
print(f"{marked} item da scaricare in '{folder.Name}', avvio sincronizzazione...")
# Triggera SendAndReceive asincrono
try:
namespace.SendAndReceive(False)
except Exception:
pass
# Aspetta che gli item siano tutti scaricati (polling con timeout)
deadline = time.time() + timeout
with tqdm(total=marked, desc=f" Download '{folder.Name}'", unit="mail", colour="cyan") as pbar:
last_remaining = marked
while time.time() < deadline:
try:
remaining = sum(
1 for i in range(1, folder.Items.Count + 1)
if hasattr(folder.Items.Item(i), "MarkForDownload")
and folder.Items.Item(i).MarkForDownload != 0
)
except Exception:
remaining = 0
done = marked - remaining
pbar.n = done
pbar.refresh()
if remaining == 0:
break
last_remaining = remaining
time.sleep(2)
else:
print(f" ⚠ Timeout: {last_remaining} item non ancora scaricati, si procede comunque.")
def get_or_create_folder(parent, name):
"""Restituisce una sottocartella esistente o la crea."""
try:
return parent.Folders.Item(name)
except Exception:
return parent.Folders.Add(name)
def archive_folder(source_folder, filter_attr, time_attr,
archive_root, cutoff_date, months_limit, root_entry_id):
"""Archivia le mail più vecchie di cutoff_date dalla source_folder."""
filter_date_str = cutoff_date.strftime("%d/%m/%Y %H:%M")
filter_str = f"{filter_attr} < '{filter_date_str}'"
try:
items = source_folder.Items.Restrict(filter_str)
items.Sort(filter_attr, True)
except Exception as e:
print(f" Filtro non applicabile su '{source_folder.Name}': {e}")
return
total_items = items.Count
if total_items == 0:
print(f" Nessuna mail da archiviare in '{source_folder.Name}'.")
return
# --- Ricostruisce il percorso relativo rispetto alla root dello store ---
# Ci fermiamo esplicitamente quando il parent è la root (EntryID noto),
# evitando di includere il nome della mailbox (es. "utente@azienda.com")
path_parts = []
f = source_folder
while True:
try:
parent = f.Parent
path_parts.insert(0, f.Name)
if parent.EntryID == root_entry_id:
break
f = parent
except Exception:
break
archived_count = 0
desc_label = "/".join(path_parts) if path_parts else source_folder.Name
with tqdm(total=total_items, desc=f"Archiviazione '{desc_label}'",
unit="mail", colour="green") as pbar:
for i in range(total_items, 0, -1):
try:
item = items.Item(i)
if not hasattr(item, time_attr):
pbar.update(1)
continue
rt = getattr(item, time_attr)
received_time = datetime(rt.year, rt.month, rt.day, rt.hour, rt.minute)
anno_str = str(received_time.year)
nome_mese = MESI_IT[received_time.month]
month_folder_name = f"{received_time.month:02d}-{nome_mese}"
pbar.set_description(f"'{desc_label}' - {nome_mese} {anno_str}")
# --- Costruisce la struttura nell'archivio ---
# Archivio → percorso relativo sorgente → Anno → Mese
target = archive_root
for part in path_parts:
target = get_or_create_folder(target, part)
target = get_or_create_folder(target, anno_str)
target = get_or_create_folder(target, month_folder_name)
# Sposta con retry
archived_item = None
for tentativo in range(3):
try:
archived_item = item.Move(target)
if archived_item:
archived_item.Save()
time.sleep(0.4)
break
except Exception:
time.sleep(1)
if archived_item:
archived_count += 1
pbar.set_postfix(successo=archived_count)
else:
pbar.set_postfix(error="Move fallito")
except Exception as e:
pbar.set_postfix(err=str(e)[:20])
pbar.update(1)
print(f"{archived_count}/{total_items} mail archiviate da '{desc_label}'.")
def main():
print("Connessione a Outlook in corso...")
try:
outlook = win32com.client.Dispatch("Outlook.Application")
namespace = outlook.GetNamespace("MAPI")
except Exception as e:
print(f"Errore connessione: {e}")
return
# --- MODIFICA 1: ricava ARCHIVE_NAME dall'email dell'utente ---
try:
user_email = get_user_email(namespace)
ARCHIVE_NAME = f"Archivio online - {user_email}"
print(f"Utente rilevato: {user_email}")
print(f"Archivio target: {ARCHIVE_NAME}")
except RuntimeError as e:
print(f"Errore: {e}")
return
try:
archive_root = namespace.Folders.Item(ARCHIVE_NAME)
except Exception:
print(f"Cartella archivio '{ARCHIVE_NAME}' non trovata in Outlook.")
return
while True:
stringa_months_limit = input("\nQuanti mesi vuoi tenere in linea? (default 3): ").strip() or "3"
try:
MONTHS_LIMIT = int(stringa_months_limit)
if MONTHS_LIMIT > 0:
break
print(" Inserisci un numero intero maggiore di zero.")
except ValueError:
print(f" '{stringa_months_limit}' non è un numero valido. Riprova.")
cutoff_date = datetime.now() - timedelta(days=MONTHS_LIMIT * 30)
print(f"\nVerranno archiviate le mail precedenti al: {cutoff_date.strftime('%d/%m/%Y')}\n")
# --- MODIFICA 2: recupera TUTTE le cartelle della mailbox ---
folders_to_process, personal_store = get_all_folders(namespace, user_email, archive_root)
print(f"Cartelle trovate da elaborare: {len(folders_to_process)}")
for f, _, _ in folders_to_process:
print(f"{f.Name}")
confirm = input("\nProcedere con l'archiviazione? (s/n): ").strip().lower()
if confirm != "s":
print("Operazione annullata.")
return
root_entry_id = personal_store.GetRootFolder().EntryID
for source_folder, filter_attr, time_attr in folders_to_process:
print(f"\n--- Cartella: {source_folder.Name} ---")
force_download_folder(source_folder, namespace)
archive_folder(source_folder, filter_attr, time_attr,
archive_root, cutoff_date, MONTHS_LIMIT, root_entry_id)
print("\nOperazione conclusa con successo.")
if __name__ == "__main__":
main()