305 lines
11 KiB
Python
Executable File
305 lines
11 KiB
Python
Executable File
#!/home/alex/devel/ASE/.venv/bin/python
|
|
"""
|
|
Script di test per inviare file CSV via FTP al server ftp_csv_receiver.py
|
|
Legge gli utenti dalla tabella ftp_accounts e carica i file dalla directory corrispondente.
|
|
"""
|
|
|
|
import logging
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from ftplib import FTP
|
|
from pathlib import Path
|
|
from threading import Lock
|
|
|
|
import mysql.connector
|
|
|
|
# Add src directory to Python path
|
|
src_path = Path(__file__).parent / "src"
|
|
sys.path.insert(0, str(src_path))
|
|
|
|
from utils.config import users_loader as setting
|
|
from utils.database.connection import connetti_db
|
|
|
|
# Configurazione logging (verrà completata nel main dopo aver creato la directory logs)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configurazione server FTP e path base
|
|
FTP_CONFIG = {"host": "localhost", "port": 2121}
|
|
|
|
BASE_CSV_PATH = Path("/home/alex/Scrivania/archivio_csv")
|
|
|
|
# Numero di worker paralleli per testare il throughput
|
|
MAX_WORKERS = 10 # Modifica questo valore per aumentare/diminuire il parallelismo
|
|
|
|
# Lock per logging thread-safe
|
|
log_lock = Lock()
|
|
|
|
|
|
def fetch_ftp_users(connection: mysql.connector.MySQLConnection) -> list[tuple]:
|
|
"""
|
|
Preleva username e password dalla tabella ftp_accounts.
|
|
|
|
Args:
|
|
connection: Connessione MySQL
|
|
|
|
Returns:
|
|
Lista di tuple (username, password)
|
|
"""
|
|
try:
|
|
cursor = connection.cursor()
|
|
|
|
query = """
|
|
SELECT username, password
|
|
FROM ase_lar.ftp_accounts
|
|
WHERE username IS NOT NULL AND password IS NOT NULL
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
logger.info("Prelevati %s utenti dal database", len(results))
|
|
return results
|
|
|
|
except mysql.connector.Error as e:
|
|
logger.error("Errore query database: %s", e)
|
|
return []
|
|
finally:
|
|
cursor.close()
|
|
|
|
|
|
def create_remote_dir(ftp: FTP, remote_dir: str) -> None:
|
|
"""
|
|
Crea ricorsivamente tutte le directory necessarie sul server FTP.
|
|
|
|
Args:
|
|
ftp: Connessione FTP attiva
|
|
remote_dir: Path della directory da creare (es. "home/ID0354/subdir")
|
|
"""
|
|
if not remote_dir or remote_dir == ".":
|
|
return
|
|
|
|
# Separa il path in parti
|
|
parts = remote_dir.split("/")
|
|
|
|
# Crea ogni livello di directory
|
|
current_path = ""
|
|
for part in parts:
|
|
if not part: # Salta parti vuote
|
|
continue
|
|
|
|
current_path = f"{current_path}/{part}" if current_path else part
|
|
|
|
try:
|
|
# Prova a creare la directory
|
|
ftp.mkd(current_path)
|
|
except Exception: # pylint: disable=broad-except
|
|
# Directory già esistente o altro errore, continua
|
|
pass
|
|
|
|
|
|
def upload_files_for_user(username: str, password: str) -> tuple[str, str, bool, int, int]:
|
|
"""
|
|
Carica tutti i file CSV dalla directory dell'utente via FTP.
|
|
Cerca ricorsivamente in tutte le sottodirectory e gestisce estensioni .csv e .CSV
|
|
|
|
Args:
|
|
username: Nome utente FTP
|
|
password: Password FTP
|
|
|
|
Returns:
|
|
Tuple con (username, status_message, successo, file_caricati, totale_file)
|
|
status_message può essere: 'OK', 'NO_DIR', 'NO_FILES', 'ERROR'
|
|
"""
|
|
user_csv_path = BASE_CSV_PATH / username
|
|
|
|
with log_lock:
|
|
logger.info("[%s] Inizio elaborazione", username)
|
|
|
|
# Verifica che la directory esista
|
|
if not user_csv_path.exists():
|
|
with log_lock:
|
|
logger.warning("[%s] Directory non trovata: %s", username, user_csv_path)
|
|
return (username, "NO_DIR", False, 0, 0)
|
|
|
|
# Trova tutti i file CSV ricorsivamente (sia .csv che .CSV)
|
|
csv_files = []
|
|
csv_files.extend(user_csv_path.glob("**/*.csv"))
|
|
csv_files.extend(user_csv_path.glob("**/*.CSV"))
|
|
|
|
if not csv_files:
|
|
with log_lock:
|
|
logger.warning("[%s] Nessun file CSV trovato in %s", username, user_csv_path)
|
|
return (username, "NO_FILES", False, 0, 0)
|
|
|
|
total_files = len(csv_files)
|
|
with log_lock:
|
|
logger.info("[%s] Trovati %s file CSV", username, total_files)
|
|
|
|
# Connessione FTP
|
|
try:
|
|
ftp = FTP()
|
|
ftp.connect(FTP_CONFIG["host"], FTP_CONFIG["port"])
|
|
ftp.login(username, password)
|
|
with log_lock:
|
|
logger.info("[%s] Connesso al server FTP", username)
|
|
|
|
# Upload di ogni file CSV mantenendo la struttura delle directory
|
|
uploaded = 0
|
|
for csv_file in csv_files:
|
|
try:
|
|
# Calcola il path relativo rispetto alla directory base dell'utente
|
|
relative_path = csv_file.relative_to(user_csv_path)
|
|
|
|
# Se il file è in una sottodirectory, crea la struttura sul server FTP
|
|
if relative_path.parent != Path("."):
|
|
# Crea ricorsivamente tutte le directory necessarie
|
|
remote_dir = str(relative_path.parent).replace("\\", "/")
|
|
create_remote_dir(ftp, remote_dir)
|
|
|
|
remote_file = str(relative_path).replace("\\", "/")
|
|
else:
|
|
remote_file = csv_file.name
|
|
|
|
# Carica il file (gli spazi nei nomi sono gestiti automaticamente da ftplib)
|
|
with log_lock:
|
|
logger.debug("[%s] Caricamento file: '%s'", username, remote_file)
|
|
with open(csv_file, "rb") as f:
|
|
ftp.storbinary(f"STOR {remote_file}", f)
|
|
with log_lock:
|
|
logger.info("[%s] File caricato: %s", username, remote_file)
|
|
uploaded += 1
|
|
except Exception as e: # pylint: disable=broad-except
|
|
with log_lock:
|
|
logger.error("[%s] Errore caricamento file %s: %s", username, csv_file.name, e)
|
|
|
|
ftp.quit()
|
|
with log_lock:
|
|
logger.info("[%s] Upload completato: %s/%s file caricati", username, uploaded, total_files)
|
|
return (username, "OK" if uploaded > 0 else "NO_UPLOAD", uploaded > 0, uploaded, total_files)
|
|
|
|
except Exception as e: # pylint: disable=broad-except
|
|
with log_lock:
|
|
logger.error("[%s] Errore FTP: %s", username, e)
|
|
return (username, "ERROR", False, 0, total_files if "total_files" in locals() else 0)
|
|
|
|
|
|
def main():
|
|
"""
|
|
Funzione principale per testare il caricamento FTP con upload paralleli.
|
|
"""
|
|
# Configura logging con file nella directory logs
|
|
log_dir = Path(__file__).parent / "logs"
|
|
log_dir.mkdir(exist_ok=True)
|
|
|
|
log_file = log_dir / "test_ftp_client.log"
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler(), # Mantiene anche l'output su console
|
|
],
|
|
)
|
|
|
|
logger.info("=== Avvio test client FTP (modalità parallela) ===")
|
|
logger.info("Log file: %s", log_file)
|
|
logger.info("Path base CSV: %s", BASE_CSV_PATH)
|
|
logger.info("Server FTP: %s:%s", FTP_CONFIG["host"], FTP_CONFIG["port"])
|
|
logger.info("Worker paralleli: %s", MAX_WORKERS)
|
|
|
|
# Connessione al database
|
|
cfg = setting.Config()
|
|
db_connection = connetti_db(cfg)
|
|
|
|
try:
|
|
# Preleva gli utenti FTP dal database
|
|
users = fetch_ftp_users(db_connection)
|
|
|
|
if not users:
|
|
logger.warning("Nessun utente trovato nel database")
|
|
return
|
|
|
|
logger.info("Avvio upload parallelo per %s utenti...", len(users))
|
|
logger.info("")
|
|
|
|
# Usa ThreadPoolExecutor per upload paralleli
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
|
|
# Sottometti tutti i task
|
|
futures = {executor.submit(upload_files_for_user, username, password): username for username, password in users}
|
|
|
|
# Raccogli i risultati man mano che completano
|
|
for future in as_completed(futures):
|
|
username = futures[future]
|
|
try:
|
|
result = future.result()
|
|
results.append(result)
|
|
except Exception as e: # pylint: disable=broad-except
|
|
logger.error("[%s] Eccezione durante l'upload: %s", username, e)
|
|
results.append((username, "ERROR", False, 0, 0))
|
|
|
|
# Analizza i risultati
|
|
logger.info("")
|
|
logger.info("=== Test completato ===")
|
|
|
|
success_count = sum(1 for _, _, success, _, _ in results if success)
|
|
error_count = len(results) - success_count
|
|
total_uploaded = sum(uploaded for _, _, _, uploaded, _ in results)
|
|
total_files = sum(total for _, _, _, _, total in results)
|
|
|
|
# Categorizza gli utenti per status
|
|
users_no_dir = [username for username, status, _, _, _ in results if status == "NO_DIR"]
|
|
users_no_files = [username for username, status, _, _, _ in results if status == "NO_FILES"]
|
|
users_error = [username for username, status, _, _, _ in results if status == "ERROR"]
|
|
users_ok = [username for username, status, _, _, _ in results if status == "OK"]
|
|
|
|
logger.info("Utenti con successo: %s/%s", success_count, len(users))
|
|
logger.info("Utenti con errori: %s/%s", error_count, len(users))
|
|
logger.info("File caricati totali: %s/%s", total_uploaded, total_files)
|
|
|
|
# Report utenti senza directory
|
|
if users_no_dir:
|
|
logger.info("")
|
|
logger.info("=== Utenti senza directory CSV (%s) ===", len(users_no_dir))
|
|
for username in sorted(users_no_dir):
|
|
logger.info(" - %s (directory attesa: %s)", username, BASE_CSV_PATH / username)
|
|
|
|
# Report utenti con directory vuota
|
|
if users_no_files:
|
|
logger.info("")
|
|
logger.info("=== Utenti con directory vuota (%s) ===", len(users_no_files))
|
|
for username in sorted(users_no_files):
|
|
logger.info(" - %s", username)
|
|
|
|
# Report utenti con errori FTP
|
|
if users_error:
|
|
logger.info("")
|
|
logger.info("=== Utenti con errori FTP (%s) ===", len(users_error))
|
|
for username in sorted(users_error):
|
|
logger.info(" - %s", username)
|
|
|
|
# Dettaglio per utente con successo
|
|
if users_ok:
|
|
logger.info("")
|
|
logger.info("=== Dettaglio utenti con successo (%s) ===", len(users_ok))
|
|
for username, status, _, uploaded, total in sorted(results):
|
|
if status == "OK":
|
|
logger.info("[%s] %s/%s file caricati", username, uploaded, total)
|
|
|
|
except Exception as e: # pylint: disable=broad-except
|
|
logger.error("Errore generale: %s", e)
|
|
sys.exit(1)
|
|
|
|
finally:
|
|
try:
|
|
db_connection.close()
|
|
logger.info("")
|
|
logger.info("Connessione MySQL chiusa")
|
|
except Exception as e: # pylint: disable=broad-except
|
|
logger.error("Errore chiusura connessione MySQL: %s", e)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|