This commit is contained in:
2026-01-11 14:57:32 +01:00
parent 509e557453
commit c175ca6fe2

View File

@@ -1,13 +1,30 @@
import win32com.client import win32com.client
import os import os
import hashlib import hashlib
import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict
from enum import IntEnum
import time import time
from nicegui import ui, run from nicegui import ui, run
# --- CONFIGURAZIONE LOGGING ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- COSTANTI OUTLOOK ---
class OutlookConstants(IntEnum):
"""Costanti Outlook per migliorare leggibilità"""
FOLDER_INBOX = 6
MAIL_CLASS = 43
ATTACHMENT_TYPE_FILE = 1
# --- LOGICA DI BACKEND --- # --- LOGICA DI BACKEND ---
def get_file_hash(file_path): def get_file_hash(file_path: str) -> str:
"""Calcola l'MD5 per identificare file identici ed evitare duplicati.""" """Calcola l'MD5 per identificare file identici ed evitare duplicati."""
hasher = hashlib.md5() hasher = hashlib.md5()
with open(file_path, 'rb') as f: with open(file_path, 'rb') as f:
@@ -16,147 +33,170 @@ def get_file_hash(file_path):
return hasher.hexdigest() return hasher.hexdigest()
class ArchiverGUI: class ArchiverGUI:
def __init__(self): """Gestisce lo stato e la logica di archiviazione delle email Outlook."""
self.running = False
self.progress = 0.0
self.current_mail = "In attesa di avvio..."
self.mesi_it = {
1: "Gennaio", 2: "Febbraio", 3: "Marzo", 4: "Aprile", 5: "Maggio", 6: "Giugno",
7: "Luglio", 8: "Agosto", 9: "Settembre", 10: "Ottobre", 11: "Novembre", 12: "Dicembre"
}
async def start_archiving(self, archive_name, onedrive_path, months_limit): MONTH_NAMES = {
1: "Gennaio", 2: "Febbraio", 3: "Marzo", 4: "Aprile",
5: "Maggio", 6: "Giugno", 7: "Luglio", 8: "Agosto",
9: "Settembre", 10: "Ottobre", 11: "Novembre", 12: "Dicembre"
}
def __init__(self):
self.running: bool = False
self.progress: float = 0.0
self.current_mail: str = "In attesa di avvio..."
async def start_archiving(self, archive_name: str, onedrive_path: str, months_limit: int) -> None:
"""Avvia il processo di archiviazione in background."""
self.running = True self.running = True
self.progress = 0.0 self.progress = 0.0
ui.notify('Connessione a Outlook in corso...', color='info') ui.notify('Connessione a Outlook in corso...', color='info')
# Avvia la logica in un thread dedicato per non bloccare l'interfaccia # Avvia la logica in un thread dedicato per non bloccare l'interfaccia
await run.io_bound(self.run_logic, archive_name, onedrive_path, months_limit) await run.io_bound(self.run_logic, archive_name, onedrive_path, months_limit)
self.running = False self.running = False
if self.progress >= 1.0: if self.progress >= 1.0:
self.current_mail = "Archiviazione completata con successo!" self.current_mail = "Archiviazione completata con successo!"
ui.notify('Processo terminato!', type='positive') ui.notify('Processo terminato!', type='positive')
def run_logic(self, archive_name, onedrive_path, months_limit): def run_logic(self, archive_name: str, onedrive_path: str, months_limit: int) -> None:
"""Logica principale di archiviazione delle email."""
try: try:
if not os.path.exists(onedrive_path): if not os.path.exists(onedrive_path):
os.makedirs(onedrive_path) os.makedirs(onedrive_path)
logger.info(f"Cartella OneDrive creata: {onedrive_path}")
outlook = win32com.client.Dispatch("Outlook.Application") outlook = win32com.client.Dispatch("Outlook.Application")
namespace = outlook.GetNamespace("MAPI") namespace = outlook.GetNamespace("MAPI")
inbox = namespace.GetDefaultFolder(6) # 6 = OlFolderInbox inbox = namespace.GetDefaultFolder(OutlookConstants.FOLDER_INBOX)
try: try:
archive_root = namespace.Folders.Item(archive_name) archive_root = namespace.Folders.Item(archive_name)
except Exception: except Exception as e:
self.current_mail = f"ERRORE: Archivio '{archive_name}' non trovato." error_msg = f"ERRORE: Archivio '{archive_name}' non trovato."
logger.error(error_msg, exc_info=e)
self.current_mail = error_msg
self.running = False self.running = False
return return
# Calcolo data limite per il filtro # Calcolo data limite per il filtro
cutoff_date = datetime.now() - timedelta(days=int(months_limit) * 30) cutoff_date = datetime.now() - timedelta(days=int(months_limit) * 30)
filter_str = f"[ReceivedTime] < '{cutoff_date.strftime('%d/%m/%Y %H:%M')}'" filter_str = f"[ReceivedTime] < '{cutoff_date.strftime('%d/%m/%Y %H:%M')}'"
items = inbox.Items.Restrict(filter_str) items = inbox.Items.Restrict(filter_str)
items.Sort("[ReceivedTime]", True) items.Sort("[ReceivedTime]", True)
total = items.Count total = items.Count
if total == 0: if total == 0:
self.current_mail = "Nessuna mail trovata con i criteri selezionati." self.current_mail = "Nessuna mail trovata con i criteri selezionati."
self.progress = 1.0 self.progress = 1.0
logger.info("Nessuna email da archiviare")
return return
processed_files = {} logger.info(f"Inizio archiviazione di {total} email")
processed_files: Dict[str, str] = {}
# Ciclo dal fondo verso l'inizio per non sballare gli indici di Outlook # Ciclo dal fondo verso l'inizio per non sballare gli indici di Outlook
for i in range(total, 0, -1): for i in range(total, 0, -1):
if not self.running: if not self.running:
self.current_mail = "Processo interrotto dall'utente." self.current_mail = "Processo interrotto dall'utente."
logger.info("Archiviazione interrotta dall'utente")
break break
try: try:
item = items.Item(i) item = items.Item(i)
rt = item.ReceivedTime rt = item.ReceivedTime
received_time = datetime(rt.year, rt.month, rt.day, rt.hour, rt.minute) received_time = datetime(rt.year, rt.month, rt.day, rt.hour, rt.minute)
self.current_mail = f"[{total-i+1}/{total}] {item.Subject[:40]}..."
self.progress = float((total - i + 1) / total)
# 1. ELIMINAZIONE ALLEGATI E AGGIUNTA LINK (Mentre è ancora in Inbox) current_idx = total - i + 1
# Questo garantisce i permessi di scrittura necessari per cancellare i file subject_preview = item.Subject[:40] if item.Subject else "(nessun oggetto)"
self.current_mail = f"[{current_idx}/{total}] {subject_preview}..."
self.progress = float(current_idx / total)
# 1. ELIMINAZIONE ALLEGATI E AGGIUNTA LINK
self.process_attachments(item, onedrive_path, received_time, processed_files) self.process_attachments(item, onedrive_path, received_time, processed_files)
item.Save() item.Save()
# 2. PREPARAZIONE CARTELLA DESTINAZIONE # 2. PREPARAZIONE CARTELLA DESTINAZIONE
anno_str = str(received_time.year) year_str = str(received_time.year)
nome_mese = self.mesi_it[received_time.month] month_name = self.MONTH_NAMES[received_time.month]
month_folder_name = f"{received_time.month:02d}-{nome_mese}" month_folder_name = f"{received_time.month:02d}-{month_name}"
y_f = self.get_or_create_folder(archive_root, anno_str) year_folder = self.get_or_create_folder(archive_root, year_str)
target_folder = self.get_or_create_folder(y_f, month_folder_name) target_folder = self.get_or_create_folder(year_folder, month_folder_name)
# 3. SPOSTAMENTO DEFINITIVO # 3. SPOSTAMENTO DEFINITIVO
item.Move(target_folder) item.Move(target_folder)
# Piccola pausa per dare respiro al server Exchange # Piccola pausa per dare respiro al server Exchange
time.sleep(0.1) time.sleep(0.1)
except Exception as e: except Exception as e:
print(f"Errore su singola mail: {e}") logger.error(f"Errore elaborazione email {i}: {e}")
continue continue
except Exception as e: except Exception as e:
self.current_mail = f"Errore critico: {str(e)}" error_msg = f"Errore critico: {str(e)}"
logger.critical(error_msg, exc_info=e)
self.current_mail = error_msg
self.running = False self.running = False
def get_or_create_folder(self, parent, name): def get_or_create_folder(self, parent, name: str):
"""Recupera una cartella se esiste, altrimenti la crea."""
try: try:
return parent.Folders.Item(name) return parent.Folders.Item(name)
except: except Exception:
logger.debug(f"Cartella '{name}' non trovata, creazione...")
return parent.Folders.Add(name) return parent.Folders.Add(name)
def process_attachments(self, mail_item, onedrive_path, received_time, processed_files): def process_attachments(self, mail_item, onedrive_path: str, received_time: datetime,
processed_files: Dict[str, str]) -> None:
"""Rimuove gli allegati reali, li salva su OneDrive e inserisce il link nella mail.""" """Rimuove gli allegati reali, li salva su OneDrive e inserisce il link nella mail."""
if mail_item.Class != 43: return # 43 = OlMail if mail_item.Class != OutlookConstants.MAIL_CLASS:
return
date_prefix = received_time.strftime("%Y-%m-%d") date_prefix = received_time.strftime("%Y-%m-%d")
# Usiamo un ciclo reverse sugli allegati per la rimozione sicura # Usiamo un ciclo reverse sugli allegati per la rimozione sicura
count = mail_item.Attachments.Count count = mail_item.Attachments.Count
for j in range(count, 0, -1): for j in range(count, 0, -1):
try: try:
att = mail_item.Attachments.Item(j) att = mail_item.Attachments.Item(j)
# Filtro: Solo file reali (Type 1), escludendo immagini nelle firme (Inline) # Filtro: Solo file reali, escludendo immagini nelle firme (Inline)
if att.Type != 1: if att.Type != OutlookConstants.ATTACHMENT_TYPE_FILE:
continue continue
is_inline = False is_inline = False
try: try:
# Controlla il tag MAPI per il Content-ID delle immagini incorporate # Controlla il tag MAPI per il Content-ID delle immagini incorporate
if att.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3712001E"): if att.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x3712001E"):
is_inline = True is_inline = True
except: pass except Exception:
pass
if is_inline: if is_inline:
continue continue
print("da qui gestisco gli allegati")
# Salvataggio fisico del file # Salvataggio fisico del file
temp_path = os.path.join(os.environ['TEMP'], att.FileName) temp_path = os.path.join(os.environ['TEMP'], att.FileName)
att.SaveAsFile(temp_path) att.SaveAsFile(temp_path)
f_hash = get_file_hash(temp_path) f_hash = get_file_hash(temp_path)
# Nome file univoco con Data e Hash corto # Nome file univoco con Data e Hash corto
unique_name = f"{date_prefix}_{f_hash[:6]}_{att.FileName}" unique_name = f"{date_prefix}_{f_hash[:6]}_{att.FileName}"
dest_path = os.path.join(onedrive_path, unique_name) dest_path = os.path.join(onedrive_path, unique_name)
if f_hash not in processed_files: if f_hash not in processed_files:
if not os.path.exists(dest_path): if not os.path.exists(dest_path):
os.replace(temp_path, dest_path) os.replace(temp_path, dest_path)
processed_files[f_hash] = dest_path processed_files[f_hash] = dest_path
logger.debug(f"File salvato: {unique_name}")
else: else:
if os.path.exists(temp_path): if os.path.exists(temp_path):
os.remove(temp_path) os.remove(temp_path)
logger.debug(f"File duplicato eliminato: {att.FileName}")
# Inserimento link HTML nel corpo della mail # Inserimento link HTML nel corpo della mail
link_html = ( link_html = (
@@ -165,13 +205,12 @@ class ArchiverGUI:
f"<a href='file:///{dest_path}'>{att.FileName}</a></div>" f"<a href='file:///{dest_path}'>{att.FileName}</a></div>"
) )
mail_item.HTMLBody = link_html + mail_item.HTMLBody mail_item.HTMLBody = link_html + mail_item.HTMLBody
# RIMOZIONE FISICA DALLA MAIL # RIMOZIONE FISICA DALLA MAIL
mail_item.Attachments.Remove(j) mail_item.Attachments.Remove(j)
except Exception as e: except Exception as e:
print(f"Errore su allegato {j}: {e}") logger.error(f"Errore elaborazione allegato {j}: {e}")
continue
# --- INTERFACCIA GRAFICA (NICEGUI) --- # --- INTERFACCIA GRAFICA (NICEGUI) ---