Compare commits

..

8 Commits

16 changed files with 133692 additions and 92 deletions

80
.env.example Normal file
View File

@@ -0,0 +1,80 @@
# ASE Application - Environment Variables
# Copia questo file in .env e modifica i valori secondo le tue necessità
# ============================================================================
# Server Mode Configuration
# ============================================================================
# Server protocol mode: ftp or sftp
# - ftp: Traditional FTP server (requires FTP_PASSIVE_PORTS and FTP_EXTERNAL_IP)
# - sftp: SFTP server over SSH (more secure, requires SSH host key)
# Default: ftp
FTP_MODE=ftp
# ============================================================================
# FTP Server Configuration (only for FTP_MODE=ftp)
# ============================================================================
# Porta iniziale del range di porte passive FTP
# Il range completo sarà FTP_PASSIVE_PORTS to (FTP_PASSIVE_PORTS + portRangeWidth - 1)
# Default: valore da ftp.ini
FTP_PASSIVE_PORTS=60000
# IP esterno da pubblicizzare ai client FTP (importante per HA con VIP)
# Questo è l'indirizzo che i client useranno per connettersi in modalità passiva
# In un setup HA, questo dovrebbe essere il VIP condiviso tra le istanze
# Default: valore da ftp.ini
FTP_EXTERNAL_IP=192.168.1.100
# ============================================================================
# Database Configuration
# ============================================================================
# Hostname del server MySQL
# Default: valore da db.ini
DB_HOST=localhost
# Porta del server MySQL
# Default: valore da db.ini
DB_PORT=3306
# Username per la connessione al database
# Default: valore da db.ini
DB_USER=ase_user
# Password per la connessione al database
# Default: valore da db.ini
DB_PASSWORD=your_secure_password
# Nome del database
# Default: valore da db.ini
DB_NAME=ase_lar
# ============================================================================
# Logging Configuration
# ============================================================================
# Livello di logging: DEBUG, INFO, WARNING, ERROR, CRITICAL
# Default: INFO
LOG_LEVEL=INFO
# ============================================================================
# Note per Docker Compose
# ============================================================================
#
# 1. Le variabili d'ambiente OVERRIDE i valori nei file .ini
# 2. Se una variabile non è impostata, viene usato il valore dal file .ini
# 3. Questo permette deployment flessibili senza modificare i file .ini
#
# Esempio di uso in docker-compose.yml:
#
# environment:
# FTP_PASSIVE_PORTS: "${FTP_PASSIVE_PORTS:-60000}"
# FTP_EXTERNAL_IP: "${FTP_EXTERNAL_IP}"
# DB_HOST: "${DB_HOST}"
# DB_PASSWORD: "${DB_PASSWORD}"
#
# Oppure usando env_file:
#
# env_file:
# - .env

127
docker-compose.example.yml Normal file
View File

@@ -0,0 +1,127 @@
version: '3.8'
services:
# ============================================================================
# FTP Server (Traditional FTP)
# ============================================================================
ftp-server:
build: .
container_name: ase-ftp-server
ports:
- "2121:2121" # FTP control port
- "40000-40449:40000-40449" # FTP passive ports range
environment:
# Server Mode
FTP_MODE: "ftp" # Mode: ftp or sftp
# FTP Configuration
FTP_PASSIVE_PORTS: "40000" # Prima porta del range passivo
FTP_EXTERNAL_IP: "192.168.1.100" # IP esterno/VIP da pubblicizzare ai client
# Database Configuration
DB_HOST: "mysql-server"
DB_PORT: "3306"
DB_USER: "ase_user"
DB_PASSWORD: "your_secure_password"
DB_NAME: "ase_lar"
# File Processing Behavior
# DELETE_AFTER_PROCESSING: "true" # Cancella file dopo elaborazione corretta (default: false = mantiene i file)
# Logging (opzionale)
LOG_LEVEL: "INFO"
volumes:
- ./logs/ftp:/app/logs
- ./data:/app/data
depends_on:
- mysql-server
restart: unless-stopped
networks:
- ase-network
# ============================================================================
# SFTP Server (SSH File Transfer Protocol)
# ============================================================================
sftp-server:
build: .
container_name: ase-sftp-server
ports:
- "2222:22" # SFTP port (SSH)
environment:
# Server Mode
FTP_MODE: "sftp" # Mode: ftp or sftp
# Database Configuration
DB_HOST: "mysql-server"
DB_PORT: "3306"
DB_USER: "ase_user"
DB_PASSWORD: "your_secure_password"
DB_NAME: "ase_lar"
# File Processing Behavior
# DELETE_AFTER_PROCESSING: "true" # Cancella file dopo elaborazione corretta (default: false = mantiene i file)
# Logging (opzionale)
LOG_LEVEL: "INFO"
volumes:
- ./logs/sftp:/app/logs
- ./data:/app/data
- ./ssh_host_key:/app/ssh_host_key:ro # SSH host key (generate with: ssh-keygen -t rsa -f ssh_host_key)
depends_on:
- mysql-server
restart: unless-stopped
networks:
- ase-network
# ============================================================================
# Esempio: Setup HA con più istanze FTP (stesso VIP)
# ============================================================================
ftp-server-2:
build: .
container_name: ase-ftp-server-2
ports:
- "2122:2121" # Diversa porta di controllo per seconda istanza
- "41000-41449:40000-40449" # Diverso range passivo sull'host
environment:
FTP_MODE: "ftp"
FTP_PASSIVE_PORTS: "40000" # Stessa config interna
FTP_EXTERNAL_IP: "192.168.1.100" # Stesso VIP condiviso
DB_HOST: "mysql-server"
DB_PORT: "3306"
DB_USER: "ase_user"
DB_PASSWORD: "your_secure_password"
DB_NAME: "ase_lar"
# DELETE_AFTER_PROCESSING: "true" # Cancella file dopo elaborazione corretta (default: false = mantiene i file)
LOG_LEVEL: "INFO"
volumes:
- ./logs/ftp2:/app/logs
- ./data:/app/data
depends_on:
- mysql-server
restart: unless-stopped
networks:
- ase-network
mysql-server:
image: mysql:8.0
container_name: ase-mysql
environment:
MYSQL_ROOT_PASSWORD: "root_password"
MYSQL_DATABASE: "ase_lar"
MYSQL_USER: "ase_user"
MYSQL_PASSWORD: "your_secure_password"
ports:
- "3306:3306"
volumes:
- mysql-data:/var/lib/mysql
- ./dbddl:/docker-entrypoint-initdb.d
restart: unless-stopped
networks:
- ase-network
networks:
ase-network:
driver: bridge
volumes:
mysql-data:

View File

@@ -0,0 +1,71 @@
# File Deletion Policy
## Comportamento di Default
Per impostazione predefinita, i file ricevuti via FTP/SFTP vengono **mantenuti** sul server dopo l'elaborazione:
-**Elaborazione riuscita**: il file viene rinominato con timestamp e salvato nella directory dell'utente, i dati vengono inseriti nel database
-**Elaborazione fallita**: il file rimane nella directory dell'utente per permettere debug e riprocessamento manuale
## Abilitare la Cancellazione Automatica
Per cancellare automaticamente i file dopo un'elaborazione **riuscita**, imposta la variabile d'ambiente nel `docker-compose.yml`:
```yaml
environment:
DELETE_AFTER_PROCESSING: "true"
```
### Valori Accettati
La variabile accetta i seguenti valori (case-insensitive):
- `true`, `1`, `yes` → cancellazione **abilitata**
- `false`, `0`, `no` o qualsiasi altro valore → cancellazione **disabilitata** (default)
## Comportamento con DELETE_AFTER_PROCESSING=true
| Scenario | Comportamento |
|----------|---------------|
| File elaborato con successo | ✅ Dati inseriti nel DB → File **cancellato** |
| Errore durante elaborazione | ❌ Errore loggato → File **mantenuto** per debug |
| File vuoto | 🗑️ File cancellato immediatamente (comportamento esistente) |
## Log
Quando un file viene cancellato dopo l'elaborazione, viene loggato:
```
INFO: File example_20250103120000.csv loaded successfully
INFO: File example_20250103120000.csv deleted after successful processing
```
In caso di errore durante la cancellazione:
```
WARNING: Failed to delete file example_20250103120000.csv: [errno] [description]
```
## Esempio Configurazione
### Mantenere i file (default)
```yaml
ftp-server:
environment:
DB_HOST: "mysql-server"
# DELETE_AFTER_PROCESSING non impostata o impostata a false
```
### Cancellare i file dopo elaborazione
```yaml
ftp-server:
environment:
DB_HOST: "mysql-server"
DELETE_AFTER_PROCESSING: "true"
```
## Note Implementative
- La cancellazione avviene **solo dopo** l'inserimento riuscito nel database
- Se la cancellazione fallisce, viene loggato un warning ma l'elaborazione è considerata riuscita
- I file con errori di elaborazione rimangono sempre sul server indipendentemente dalla configurazione
- La policy si applica sia a FTP che a SFTP

252
docs/FTP_SFTP_SETUP.md Normal file
View File

@@ -0,0 +1,252 @@
# FTP/SFTP Server Setup Guide
Il sistema ASE supporta sia FTP che SFTP utilizzando lo stesso codice Python. La modalità viene selezionata tramite la variabile d'ambiente `FTP_MODE`.
## Modalità Supportate
### FTP (File Transfer Protocol)
- **Protocollo**: FTP classico
- **Porta**: 21 (o configurabile)
- **Sicurezza**: Non criptato (considera FTPS per produzione)
- **Porte passive**: Richiede un range di porte configurabile
- **Caso d'uso**: Compatibilità con client legacy, performance
### SFTP (SSH File Transfer Protocol)
- **Protocollo**: SSH-based file transfer
- **Porta**: 22 (o configurabile)
- **Sicurezza**: Criptato tramite SSH
- **Porte passive**: Non necessarie (usa solo la porta SSH)
- **Caso d'uso**: Sicurezza, firewall-friendly
## Configurazione
### Variabili d'Ambiente
#### Comuni a entrambi i protocolli
```bash
FTP_MODE=ftp # o "sftp"
DB_HOST=mysql-server
DB_PORT=3306
DB_USER=ase_user
DB_PASSWORD=password
DB_NAME=ase_lar
LOG_LEVEL=INFO
```
#### Specifiche per FTP
```bash
FTP_PASSIVE_PORTS=40000 # Prima porta del range passivo
FTP_EXTERNAL_IP=192.168.1.100 # VIP per HA
```
#### Specifiche per SFTP
```bash
# Nessuna variabile specifica - richiede solo SSH host key
```
## Setup Docker Compose
### Server FTP
```yaml
services:
ftp-server:
build: .
container_name: ase-ftp-server
ports:
- "2121:2121"
- "40000-40449:40000-40449"
environment:
FTP_MODE: "ftp"
FTP_PASSIVE_PORTS: "40000"
FTP_EXTERNAL_IP: "192.168.1.100"
DB_HOST: "mysql-server"
DB_USER: "ase_user"
DB_PASSWORD: "password"
DB_NAME: "ase_lar"
volumes:
- ./logs/ftp:/app/logs
- ./data:/app/data
```
### Server SFTP
```yaml
services:
sftp-server:
build: .
container_name: ase-sftp-server
ports:
- "2222:22"
environment:
FTP_MODE: "sftp"
DB_HOST: "mysql-server"
DB_USER: "ase_user"
DB_PASSWORD: "password"
DB_NAME: "ase_lar"
volumes:
- ./logs/sftp:/app/logs
- ./data:/app/data
- ./ssh_host_key:/app/ssh_host_key:ro
```
## Generazione SSH Host Key per SFTP
Prima di avviare il server SFTP, genera la chiave SSH:
```bash
ssh-keygen -t rsa -b 4096 -f ssh_host_key -N ""
```
Questo crea:
- `ssh_host_key` - Chiave privata (monta nel container)
- `ssh_host_key.pub` - Chiave pubblica
## Autenticazione
Entrambi i protocolli usano lo stesso sistema di autenticazione:
1. **Admin user**: Configurato in `ftp.ini`
2. **Virtual users**: Salvati nella tabella `virtusers` del database
3. **Password**: SHA256 hash
4. **Sincronizzazione**: Automatica tra tutte le istanze (legge sempre dal DB)
## Comandi SITE (solo FTP)
I comandi SITE sono disponibili solo in modalità FTP:
```bash
ftp> site addu username password # Aggiungi utente
ftp> site disu username # Disabilita utente
ftp> site enau username # Abilita utente
ftp> site lstu # Lista utenti
```
In modalità SFTP, usa lo script `load_ftp_users.py` per gestire gli utenti.
## High Availability (HA)
### Setup HA con FTP
Puoi eseguire più istanze FTP che condividono lo stesso VIP:
```yaml
ftp-server-1:
environment:
FTP_EXTERNAL_IP: "192.168.1.100" # VIP condiviso
ports:
- "2121:2121"
- "40000-40449:40000-40449"
ftp-server-2:
environment:
FTP_EXTERNAL_IP: "192.168.1.100" # Stesso VIP
ports:
- "2122:2121"
- "41000-41449:40000-40449" # Range diverso sull'host
```
### Setup HA con SFTP
Più semplice, nessuna configurazione di porte passive:
```yaml
sftp-server-1:
ports:
- "2222:22"
sftp-server-2:
ports:
- "2223:22"
```
## Testing
### Test FTP
```bash
ftp 192.168.1.100 2121
# Username: admin (o utente dal database)
# Password: <password>
ftp> ls
ftp> put file.csv
ftp> by
```
### Test SFTP
```bash
sftp -P 2222 admin@192.168.1.100
# Password: <password>
sftp> ls
sftp> put file.csv
sftp> exit
```
## Monitoring
I log sono disponibili sia su file che su console (Docker):
```bash
# Visualizza log FTP
docker logs ase-ftp-server
# Visualizza log SFTP
docker logs ase-sftp-server
# Segui i log in tempo reale
docker logs -f ase-ftp-server
```
## Troubleshooting
### FTP: Errore "Can't connect to passive port"
- Verifica che il range di porte passive sia mappato correttamente in Docker
- Controlla che `FTP_EXTERNAL_IP` sia impostato correttamente
- Verifica che `FTP_PASSIVE_PORTS` corrisponda al range configurato
### SFTP: Errore "Connection refused"
- Verifica che l'SSH host key esista e sia montato correttamente
- Controlla i permessi del file SSH host key (deve essere leggibile)
- Installa `asyncssh`: `pip install asyncssh`
### Autenticazione fallita (entrambi)
- Verifica che il database sia raggiungibile
- Controlla che le credenziali del database siano corrette
- Verifica che l'utente esista nella tabella `virtusers` e sia abilitato (`disabled_at IS NULL`)
## Dipendenze
### FTP
```bash
pip install pyftpdlib mysql-connector-python
```
### SFTP
```bash
pip install asyncssh aiomysql
```
## Performance
- **FTP**: Più veloce per trasferimenti di file grandi, minore overhead
- **SFTP**: Leggermente più lento a causa della crittografia SSH, ma più sicuro
## Sicurezza
### FTP
- ⚠️ Non criptato - considera FTPS per produzione
- Abilita `permit_foreign_addresses` per NAT/proxy
- Usa firewall per limitare accesso
### SFTP
- ✅ Completamente criptato tramite SSH
- ✅ Più sicuro per Internet pubblico
- ✅ Supporta autenticazione a chiave pubblica (future enhancement)
## Migration
Per migrare da FTP a SFTP:
1. Avvia server SFTP con stesse credenziali database
2. Testa connessione SFTP
3. Migra client gradualmente
4. Spegni server FTP quando tutti i client sono migrati
Gli utenti e i dati rimangono gli stessi!

114540
logs/non_sysgeo.txt Normal file

File diff suppressed because it is too large Load Diff

17641
logs/sysgeo.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -7,13 +7,14 @@ requires-python = ">=3.12"
dependencies = [
"aiomysql>=0.2.0",
"cryptography>=45.0.3",
"mysql-connector-python>=9.3.0", # Needed for synchronous DB connections (ftp_csv_receiver.py, load_ftp_users.py)
"mysql-connector-python>=9.3.0", # Needed for synchronous DB connections (ftp_csv_receiver.py, load_ftp_users.py)
"pyftpdlib>=2.0.1",
"pyproj>=3.7.1",
"utm>=0.8.1",
"aiofiles>=24.1.0",
"aiosmtplib>=3.0.2",
"aioftp>=0.22.3",
"asyncssh>=2.21.1",
]
[dependency-groups]

View File

@@ -1,67 +1,43 @@
#!.venv/bin/python
"""
This module implements an FTP server with custom commands for
This module implements an FTP/SFTP server with custom commands for
managing virtual users and handling CSV file uploads.
Server mode is controlled by FTP_MODE environment variable:
- FTP_MODE=ftp (default): Traditional FTP server
- FTP_MODE=sftp: SFTP (SSH File Transfer Protocol) server
"""
import asyncio
import logging
import os
import sys
from hashlib import sha256
from logging.handlers import RotatingFileHandler
from pathlib import Path
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from utils.authorizers.database_authorizer import DatabaseAuthorizer
from utils.config import loader_ftp_csv as setting
from utils.connect import file_management, user_admin
from utils.database.connection import connetti_db
# Configure logging (moved inside main function)
logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database.
Args:
cfg: The configuration object.
"""
super().__init__()
self.add_user(cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
# Define the database connection
conn = connetti_db(cfg)
# Create a cursor
cur = conn.cursor()
cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL")
for ftpuser, user_hash, virtpath, perm in cur.fetchall():
# Create the user's directory if it does not exist.
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored user_hash
user_hash = sha256(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != user_hash:
raise KeyError
except KeyError:
raise AuthenticationFailed # noqa: B904
# Legacy authorizer kept for reference (not used anymore)
# The DatabaseAuthorizer is now used for real-time database synchronization
class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
# Permetti connessioni dati da indirizzi IP diversi (importante per NAT/proxy)
permit_foreign_addresses = True
def __init__(self: object, conn: object, server: object, ioloop: object = None) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions.
@@ -137,36 +113,132 @@ class ASEHandler(FTPHandler):
return user_admin.ftp_SITE_LSTU(self, line)
def main():
"""Main function to start the FTP server."""
# Load the configuration settings
cfg = setting.Config()
def setup_logging(log_filename: str):
"""
Configura il logging per il server FTP con rotation e output su console.
Args:
log_filename (str): Percorso del file di log.
"""
root_logger = logging.getLogger()
formatter = logging.Formatter("%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s")
# Rimuovi eventuali handler esistenti
if root_logger.hasHandlers():
root_logger.handlers.clear()
# Handler per file con rotation (max 10MB per file, mantiene 5 backup)
file_handler = RotatingFileHandler(
log_filename,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5, # Mantiene 5 file di backup
encoding="utf-8"
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Handler per console (utile per Docker)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
root_logger.setLevel(logging.INFO)
root_logger.info("Logging FTP configurato con rotation (10MB, 5 backup) e console output")
def start_ftp_server(cfg):
"""Start traditional FTP server."""
try:
# Initialize the authorizer and handler
authorizer = DummySha256Authorizer(cfg)
# Initialize the authorizer with database support
# This authorizer checks the database on every login, ensuring
# all FTP server instances stay synchronized without restarts
authorizer = DatabaseAuthorizer(cfg)
# Initialize handler
handler = ASEHandler
handler.cfg = cfg
handler.authorizer = authorizer
handler.masquerade_address = cfg.proxyaddr
# Set the range of passive ports for the FTP server
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = _range
# Configure logging
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
# Use cfg.logfilename directly without checking its existence
filename=cfg.logfilename,
level=logging.INFO,
# Set masquerade address only if configured (importante per HA con VIP)
# Questo è l'IP che il server FTP pubblicherà ai client per le connessioni passive
if cfg.proxyaddr and cfg.proxyaddr.strip():
handler.masquerade_address = cfg.proxyaddr
logger.info(f"FTP masquerade address configured: {cfg.proxyaddr}")
else:
logger.info("FTP masquerade address not configured - using server's default IP")
# Set the range of passive ports for the FTP server
passive_ports_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = passive_ports_range
# Log configuration
logger.info(f"Starting FTP server on port {cfg.service_port} with DatabaseAuthorizer")
logger.info(
f"FTP passive ports configured: {cfg.firstport}-{cfg.firstport + cfg.portrangewidth - 1} "
f"({len(passive_ports_range)} ports)"
)
logger.info(f"FTP permit_foreign_addresses: {handler.permit_foreign_addresses}")
logger.info(f"Database connection: {cfg.dbuser}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbname}")
# Create and start the FTP server
server = FTPServer(("0.0.0.0", cfg.service_port), handler)
server.serve_forever()
except Exception as e:
logger.error("Exit with error: %s.", e)
logger.error("FTP server error: %s", e, exc_info=True)
sys.exit(1)
async def start_sftp_server_async(cfg):
"""Start SFTP server (async)."""
try:
from utils.servers.sftp_server import start_sftp_server
logger.info(f"Starting SFTP server on port {cfg.service_port}")
logger.info(f"Database connection: {cfg.dbuser}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbname}")
# Start SFTP server
server = await start_sftp_server(cfg, host="0.0.0.0", port=cfg.service_port)
# Keep server running
await asyncio.Event().wait()
except ImportError as e:
logger.error("SFTP mode requires 'asyncssh' library. Install with: pip install asyncssh")
logger.error(f"Error: {e}")
sys.exit(1)
except Exception as e:
logger.error("SFTP server error: %s", e, exc_info=True)
sys.exit(1)
def main():
"""Main function to start FTP or SFTP server based on FTP_MODE environment variable."""
# Load the configuration settings
cfg = setting.Config()
# Configure logging first
setup_logging(cfg.logfilename)
# Get server mode from environment variable (default: ftp)
server_mode = os.getenv("FTP_MODE", "ftp").lower()
if server_mode not in ["ftp", "sftp"]:
logger.error(f"Invalid FTP_MODE: {server_mode}. Valid values: ftp, sftp")
sys.exit(1)
logger.info(f"Server mode: {server_mode.upper()}")
try:
if server_mode == "ftp":
start_ftp_server(cfg)
elif server_mode == "sftp":
asyncio.run(start_sftp_server_async(cfg))
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error("Unexpected error: %s", e, exc_info=True)
sys.exit(1)
if __name__ == "__main__":

View File

@@ -68,6 +68,38 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> list[tupl
cursor.close()
def fetch_existing_users(connection: mysql.connector.MySQLConnection) -> dict[str, tuple]:
"""
Fetches existing FTP users from virtusers table.
Args:
connection (mysql.connector.MySQLConnection): The database connection object.
Returns:
dict: Dictionary mapping username to (is_enabled, has_matching_password).
is_enabled is True if disabled_at is NULL.
"""
try:
cursor = connection.cursor()
query = """
SELECT ftpuser, disabled_at
FROM ase_lar.virtusers
"""
cursor.execute(query)
results = cursor.fetchall()
# Create dictionary: username -> is_enabled
users_dict = {username: (disabled_at is None) for username, disabled_at in results}
logger.info("Trovati %s utenti esistenti in virtusers", len(users_dict))
return users_dict
except mysql.connector.Error as e:
logger.error("Errore query database virtusers: %s", e)
return {}
finally:
cursor.close()
def send_site_command(ftp: FTP, command: str) -> bool:
"""
Sends a SITE command to the FTP server.
@@ -90,9 +122,13 @@ def send_site_command(ftp: FTP, command: str) -> bool:
def main():
"""
Main function to connect to the database, fetch FTP user data, and send SITE ADDU commands to the FTP server.
Main function to connect to the database, fetch FTP user data, and synchronize users to FTP server.
This function is idempotent - it can be run multiple times safely:
- If user exists and is enabled: skips
- If user exists but is disabled: enables it (SITE ENAU)
- If user doesn't exist: creates it (SITE ADDU)
"""
logger.info("Avvio script caricamento utenti FTP")
logger.info("Avvio script caricamento utenti FTP (idempotente)")
cfg = setting.Config()
# Connessioni
@@ -100,32 +136,58 @@ def main():
ftp_connection = connect_ftp()
try:
# Preleva dati dal database
data = fetch_data_from_db(db_connection)
# Preleva utenti da sincronizzare
users_to_sync = fetch_data_from_db(db_connection)
if not data:
logger.warning("Nessun dato trovato nel database")
if not users_to_sync:
logger.warning("Nessun utente da sincronizzare nel database ftp_accounts")
return
success_count = 0
# Preleva utenti già esistenti
existing_users = fetch_existing_users(db_connection)
added_count = 0
enabled_count = 0
skipped_count = 0
error_count = 0
# Processa ogni riga
for row in data:
# Processa ogni utente
for row in users_to_sync:
username, password = row
# Costruisci il comando SITE completo
ftp_site_command = f"addu {username} {password}"
if username in existing_users:
is_enabled = existing_users[username]
logger.info("Sending ftp command: %s", ftp_site_command)
if is_enabled:
# Utente già esiste ed è abilitato - skip
logger.info("Utente %s già esiste ed è abilitato - skip", username)
skipped_count += 1
else:
# Utente esiste ma è disabilitato - riabilita
logger.info("Utente %s esiste ma è disabilitato - riabilito con SITE ENAU", username)
ftp_site_command = f"enau {username}"
# Invia comando SITE
if send_site_command(ftp_connection, ftp_site_command):
success_count += 1
if send_site_command(ftp_connection, ftp_site_command):
enabled_count += 1
else:
error_count += 1
else:
error_count += 1
# Utente non esiste - crea
logger.info("Utente %s non esiste - creazione con SITE ADDU", username)
ftp_site_command = f"addu {username} {password}"
logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count)
if send_site_command(ftp_connection, ftp_site_command):
added_count += 1
else:
error_count += 1
logger.info(
"Elaborazione completata. Aggiunti: %s, Riabilitati: %s, Saltati: %s, Errori: %s",
added_count,
enabled_count,
skipped_count,
error_count
)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)

View File

@@ -121,7 +121,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
# Cache hit! Use cached module
modulo = _module_cache[module_name]
cache_key = module_name
logger.debug("Modulo caricato dalla cache: %s", module_name)
logger.info("Modulo caricato dalla cache: %s", module_name)
break
# If not in cache, import dynamically
@@ -133,7 +133,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
# Store in cache for future use
_module_cache[module_name] = modulo
cache_key = module_name
logger.info("Funzione 'main_loader' caricata dal modulo %s (cached)", module_name)
logger.info("Modulo caricato per la prima volta: %s", module_name)
break
except (ImportError, AttributeError) as e:
logger.debug(

View File

@@ -0,0 +1,177 @@
"""
Database-backed authorizer for FTP server that checks authentication against database in real-time.
This ensures multiple FTP server instances stay synchronized without needing restarts.
"""
import logging
from hashlib import sha256
from pathlib import Path
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
class DatabaseAuthorizer(DummyAuthorizer):
"""
Custom authorizer that validates users against the database on every login.
This approach ensures that:
- Multiple FTP server instances stay synchronized
- User changes (add/remove/disable) are reflected immediately
- No server restart is needed when users are modified
"""
def __init__(self, cfg: dict) -> None:
"""
Initializes the authorizer with admin user only.
Regular users are validated against database at login time.
Args:
cfg: The configuration object.
"""
super().__init__()
self.cfg = cfg
# Add admin user to in-memory authorizer (always available)
self.add_user(
cfg.adminuser[0], # username
cfg.adminuser[1], # password hash
cfg.adminuser[2], # home directory
perm=cfg.adminuser[3] # permissions
)
logger.info("DatabaseAuthorizer initialized with admin user")
def validate_authentication(self, username: str, password: str, handler: object) -> None:
"""
Validates user authentication against the database.
This method is called on every login attempt and checks:
1. If user is admin, use in-memory credentials
2. Otherwise, query database for user credentials
3. Verify password hash matches
4. Ensure user is not disabled
Args:
username: The username attempting to login.
password: The plain-text password provided.
handler: The FTP handler object.
Raises:
AuthenticationFailed: If authentication fails for any reason.
"""
# Hash the provided password
password_hash = sha256(password.encode("UTF-8")).hexdigest()
# Check if user is admin (stored in memory)
if username == self.cfg.adminuser[0]:
if self.user_table[username]["pwd"] != password_hash:
logger.warning(f"Failed admin login attempt for user: {username}")
raise AuthenticationFailed("Invalid credentials")
return
# For regular users, check database
try:
conn = connetti_db(self.cfg)
cur = conn.cursor()
# Query user from database
cur.execute(
f"SELECT ftpuser, hash, virtpath, perm, disabled_at FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s",
(username,)
)
result = cur.fetchone()
cur.close()
conn.close()
if not result:
logger.warning(f"Login attempt for non-existent user: {username}")
raise AuthenticationFailed("Invalid credentials")
ftpuser, stored_hash, virtpath, perm, disabled_at = result
# Check if user is disabled
if disabled_at is not None:
logger.warning(f"Login attempt for disabled user: {username}")
raise AuthenticationFailed("User account is disabled")
# Verify password
if stored_hash != password_hash:
logger.warning(f"Invalid password for user: {username}")
raise AuthenticationFailed("Invalid credentials")
# Authentication successful - ensure user directory exists
try:
Path(virtpath).mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create directory for user {username}: {e}")
raise AuthenticationFailed("System error")
# Temporarily add user to in-memory table for this session
# This allows pyftpdlib to work correctly for the duration of the session
# We add/update directly to avoid issues with add_user() checking if user exists
if username in self.user_table:
# User already exists, just update credentials
self.user_table[username]['pwd'] = stored_hash
self.user_table[username]['home'] = virtpath
self.user_table[username]['perm'] = perm
self.user_table[username]['operms'] = {}
else:
# User doesn't exist, add to table directly with all required fields
self.user_table[username] = {
'pwd': stored_hash,
'home': virtpath,
'perm': perm,
'operms': {}, # Optional per-directory permissions
'msg_login': '230 Login successful.',
'msg_quit': '221 Goodbye.'
}
logger.info(f"Successful login for user: {username}")
except AuthenticationFailed:
raise
except Exception as e:
logger.error(f"Database error during authentication for user {username}: {e}", exc_info=True)
raise AuthenticationFailed("System error")
def has_user(self, username: str) -> bool:
"""
Check if a user exists in the database or in-memory table.
This is called by pyftpdlib for various checks. We override it to check
the database as well as the in-memory table.
Args:
username: The username to check.
Returns:
True if user exists and is enabled, False otherwise.
"""
# Check in-memory first (for admin and active sessions)
if username in self.user_table:
return True
# Check database for regular users
try:
conn = connetti_db(self.cfg)
cur = conn.cursor()
cur.execute(
f"SELECT COUNT(*) FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s AND disabled_at IS NULL",
(username,)
)
count = cur.fetchone()[0]
cur.close()
conn.close()
return count > 0
except Exception as e:
logger.error(f"Database error checking user existence for {username}: {e}")
return False

View File

@@ -1,5 +1,6 @@
"""set configurations"""
import os
from configparser import ConfigParser
from . import ENV_PARENT_PATH
@@ -10,15 +11,21 @@ class Config:
"""
Initializes the Config class by reading configuration files.
It loads settings from 'ftp.ini' and 'db.ini' for FTP server, CSV, logging, and database.
Environment variables override INI file settings for Docker deployments.
"""
c = ConfigParser()
c.read([f"{ENV_PARENT_PATH}/env/ftp.ini", f"{ENV_PARENT_PATH}/env/db.ini"])
# FTP setting
self.service_port = c.getint("ftpserver", "service_port")
self.firstport = c.getint("ftpserver", "firstPort")
self.proxyaddr = c.get("ftpserver", "proxyAddr")
# FTP setting (with environment variable override for Docker)
self.service_port = int(os.getenv("FTP_PORT", c.getint("ftpserver", "service_port")))
# FTP_PASSIVE_PORTS: override della porta iniziale del range passivo
self.firstport = int(os.getenv("FTP_PASSIVE_PORTS", c.getint("ftpserver", "firstPort")))
# FTP_EXTERNAL_IP: override dell'IP pubblicizzato (VIP per HA)
self.proxyaddr = os.getenv("FTP_EXTERNAL_IP", c.get("ftpserver", "proxyAddr"))
self.portrangewidth = c.getint("ftpserver", "portRangeWidth")
self.virtpath = c.get("ftpserver", "virtpath")
self.adminuser = c.get("ftpserver", "adminuser").split("|")
@@ -27,18 +34,22 @@ class Config:
self.fileext = c.get("ftpserver", "fileext").upper().split("|")
self.defperm = c.get("ftpserver", "defaultUserPerm")
# File processing behavior: delete files after successful processing
# Set DELETE_AFTER_PROCESSING=true in docker-compose to enable
self.delete_after_processing = os.getenv("DELETE_AFTER_PROCESSING", "false").lower() in ("true", "1", "yes")
# CSV FILE setting
self.csvfs = c.get("csvfs", "path")
# LOG setting
self.logfilename = c.get("logging", "logFilename")
# DB setting
self.dbhost = c.get("db", "hostname")
self.dbport = c.getint("db", "port")
self.dbuser = c.get("db", "user")
self.dbpass = c.get("db", "password")
self.dbname = c.get("db", "dbName")
# DB setting (with environment variable override for Docker)
self.dbhost = os.getenv("DB_HOST", c.get("db", "hostname"))
self.dbport = int(os.getenv("DB_PORT", c.getint("db", "port")))
self.dbuser = os.getenv("DB_USER", c.get("db", "user"))
self.dbpass = os.getenv("DB_PASSWORD", c.get("db", "password"))
self.dbname = os.getenv("DB_NAME", c.get("db", "dbName"))
self.max_retries = c.getint("db", "maxRetries")
# Tables

View File

@@ -109,6 +109,14 @@ async def on_file_received_async(self: object, file: str) -> None:
# Note: autocommit=True in connection, no need for explicit commit
logger.info(f"File {new_filename} loaded successfully")
# Delete file after successful processing if configured
if getattr(cfg, 'delete_after_processing', False):
try:
os.remove(f"{path}/{new_filename}")
logger.info(f"File {new_filename} deleted after successful processing")
except Exception as e:
logger.warning(f"Failed to delete file {new_filename}: {e}")
except Exception as e:
logger.error(f"File {new_filename} not loaded. Held in user path.")
logger.error(f"{e}")

View File

@@ -4,6 +4,7 @@ import logging
import os
import signal
from collections.abc import Callable, Coroutine
from logging.handlers import RotatingFileHandler
from typing import Any
import aiomysql
@@ -33,24 +34,37 @@ class WorkerFormatter(logging.Formatter):
def setup_logging(log_filename: str, log_level_str: str):
"""Configura il logging globale.
"""Configura il logging globale con rotation automatica.
Args:
log_filename (str): Percorso del file di log.
log_level_str (str): Livello di log (es. "INFO", "DEBUG").
"""
logger = logging.getLogger()
handler = logging.FileHandler(log_filename)
formatter = WorkerFormatter("%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s")
handler.setFormatter(formatter)
# Rimuovi eventuali handler esistenti e aggiungi il nostro
# Rimuovi eventuali handler esistenti
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(handler)
# Handler per file con rotation (max 10MB per file, mantiene 5 backup)
file_handler = RotatingFileHandler(
log_filename,
maxBytes=10 * 1024 * 1024, # 10 MB
backupCount=5, # Mantiene 5 file di backup
encoding="utf-8"
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Handler per console (utile per Docker)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
log_level = getattr(logging, log_level_str.upper(), logging.INFO)
logger.setLevel(log_level)
logger.info("Logging configurato correttamente")
logger.info("Logging configurato correttamente con rotation (10MB, 5 backup)")
def setup_signal_handlers(logger: logging.Logger):

View File

@@ -0,0 +1,240 @@
"""
SFTP Server implementation using asyncssh.
Shares the same authentication system and file handling logic as the FTP server.
"""
import asyncio
import logging
from pathlib import Path
import asyncssh
from utils.connect import file_management
from utils.database.connection import connetti_db_async
logger = logging.getLogger(__name__)
class ASESFTPServer(asyncssh.SFTPServer):
"""Custom SFTP server that handles file uploads with the same logic as FTP server."""
def __init__(self, chan):
"""Initialize SFTP server with channel."""
super().__init__(chan)
# Get config from connection (set during authentication)
self.cfg = chan.get_connection()._cfg
async def close(self):
"""Called when SFTP session is closed."""
logger.info(f"SFTP session closed for user: {self._chan.get_connection().get_extra_info('username')}")
await super().close()
class ASESSHServer(asyncssh.SSHServer):
"""Custom SSH server for SFTP authentication using database."""
def __init__(self, cfg):
"""Initialize SSH server with configuration."""
self.cfg = cfg
self.user_home_dirs = {} # Store user home directories after authentication
super().__init__()
def connection_made(self, conn):
"""Called when connection is established."""
# Store config in connection for later use
conn._cfg = self.cfg
conn._ssh_server = self # Store reference to server for accessing user_home_dirs
logger.info(f"SSH connection from {conn.get_extra_info('peername')[0]}")
def connection_lost(self, exc):
"""Called when connection is lost."""
if exc:
logger.error(f"SSH connection lost: {exc}")
async def validate_password(self, username, password):
"""
Validate user credentials against database.
Same logic as DatabaseAuthorizer for FTP.
"""
from hashlib import sha256
# Hash the provided password
password_hash = sha256(password.encode("UTF-8")).hexdigest()
# Check if user is admin
if username == self.cfg.adminuser[0]:
if self.cfg.adminuser[1] == password_hash:
# Store admin home directory
self.user_home_dirs[username] = self.cfg.adminuser[2]
logger.info(f"Admin user '{username}' authenticated successfully (home: {self.cfg.adminuser[2]})")
return True
else:
logger.warning(f"Failed admin login attempt for user: {username}")
return False
# For regular users, check database
try:
conn = await connetti_db_async(self.cfg)
cur = await conn.cursor()
# Query user from database
await cur.execute(
f"SELECT ftpuser, hash, virtpath, perm, disabled_at FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s",
(username,)
)
result = await cur.fetchone()
await cur.close()
conn.close()
if not result:
logger.warning(f"SFTP login attempt for non-existent user: {username}")
return False
ftpuser, stored_hash, virtpath, perm, disabled_at = result
# Check if user is disabled
if disabled_at is not None:
logger.warning(f"SFTP login attempt for disabled user: {username}")
return False
# Verify password
if stored_hash != password_hash:
logger.warning(f"Invalid password for SFTP user: {username}")
return False
# Authentication successful - ensure user directory exists
try:
Path(virtpath).mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create directory for user {username}: {e}")
return False
# Store the user's home directory for chroot
self.user_home_dirs[username] = virtpath
logger.info(f"Successful SFTP login for user: {username} (home: {virtpath})")
return True
except Exception as e:
logger.error(f"Database error during SFTP authentication for user {username}: {e}", exc_info=True)
return False
def password_auth_supported(self):
"""Enable password authentication."""
return True
def begin_auth(self, username):
"""Called when authentication begins."""
logger.debug(f"Authentication attempt for user: {username}")
return True
class SFTPFileHandler(asyncssh.SFTPServer):
"""Extended SFTP server with file upload handling."""
def __init__(self, chan):
super().__init__(chan, chroot=self._get_user_home(chan))
self.cfg = chan.get_connection()._cfg
self._open_files = {} # Track open files for processing
@staticmethod
def _get_user_home(chan):
"""Get the home directory for the authenticated user."""
conn = chan.get_connection()
username = conn.get_extra_info('username')
ssh_server = getattr(conn, '_ssh_server', None)
if ssh_server and username in ssh_server.user_home_dirs:
return ssh_server.user_home_dirs[username]
# Fallback for admin user
if hasattr(conn, '_cfg') and username == conn._cfg.adminuser[0]:
return conn._cfg.adminuser[2]
return None
def open(self, path, pflags, attrs):
"""Track files being opened for writing."""
result = super().open(path, pflags, attrs)
# If file is opened for writing (pflags contains FXF_WRITE)
if pflags & 0x02: # FXF_WRITE flag
real_path = self.map_path(path)
# Convert bytes to str if necessary
if isinstance(real_path, bytes):
real_path = real_path.decode('utf-8')
self._open_files[result] = real_path
logger.debug(f"File opened for writing: {real_path}")
return result
async def close(self, file_obj):
"""Process file after it's closed."""
# Call parent close first (this doesn't return anything useful)
result = super().close(file_obj)
# Check if this file was tracked
if file_obj in self._open_files:
filepath = self._open_files.pop(file_obj)
# Process CSV files
if filepath.lower().endswith('.csv'):
try:
logger.info(f"CSV file closed after upload via SFTP: {filepath}")
# Get username
username = self._chan.get_connection().get_extra_info('username')
# Create a mock handler object with required attributes
mock_handler = type('obj', (object,), {
'cfg': self.cfg,
'username': username
})()
# Call the file processing function
from utils.connect import file_management
await file_management.on_file_received_async(mock_handler, filepath)
except Exception as e:
logger.error(f"Error processing SFTP file on close: {e}", exc_info=True)
return result
async def exit(self):
"""Handle session close."""
await super().exit()
# Note: File processing is handled in close() method, not here
# This avoids double-processing when both close and rename are called
async def start_sftp_server(cfg, host='0.0.0.0', port=22):
"""
Start SFTP server.
Args:
cfg: Configuration object
host: Host to bind to
port: Port to bind to
Returns:
asyncssh server object
"""
logger.info(f"Starting SFTP server on {host}:{port}")
# Create SSH server
ssh_server = ASESSHServer(cfg)
# Start asyncssh server
server = await asyncssh.create_server(
lambda: ssh_server,
host,
port,
server_host_keys=['/app/ssh_host_key'], # You'll need to generate this
sftp_factory=SFTPFileHandler,
)
logger.info(f"SFTP server started successfully on {host}:{port}")
logger.info(f"Database connection: {cfg.dbuser}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbname}")
return server

304
test_ftp_client.py Executable file
View File

@@ -0,0 +1,304 @@
#!/home/alex/devel/ASE/.venv/bin/python
"""
Script di test per inviare file CSV via FTP al server ftp_csv_receiver.py
Legge gli utenti dalla tabella ftp_accounts e carica i file dalla directory corrispondente.
"""
import logging
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from ftplib import FTP
from pathlib import Path
from threading import Lock
import mysql.connector
# Add src directory to Python path
src_path = Path(__file__).parent / "src"
sys.path.insert(0, str(src_path))
from utils.config import users_loader as setting
from utils.database.connection import connetti_db
# Configurazione logging (verrà completata nel main dopo aver creato la directory logs)
logger = logging.getLogger(__name__)
# Configurazione server FTP e path base
FTP_CONFIG = {"host": "localhost", "port": 2121}
BASE_CSV_PATH = Path("/home/alex/Scrivania/archivio_csv")
# Numero di worker paralleli per testare il throughput
MAX_WORKERS = 10 # Modifica questo valore per aumentare/diminuire il parallelismo
# Lock per logging thread-safe
log_lock = Lock()
def fetch_ftp_users(connection: mysql.connector.MySQLConnection) -> list[tuple]:
"""
Preleva username e password dalla tabella ftp_accounts.
Args:
connection: Connessione MySQL
Returns:
Lista di tuple (username, password)
"""
try:
cursor = connection.cursor()
query = """
SELECT username, password
FROM ase_lar.ftp_accounts
WHERE username IS NOT NULL AND password IS NOT NULL
"""
cursor.execute(query)
results = cursor.fetchall()
logger.info("Prelevati %s utenti dal database", len(results))
return results
except mysql.connector.Error as e:
logger.error("Errore query database: %s", e)
return []
finally:
cursor.close()
def create_remote_dir(ftp: FTP, remote_dir: str) -> None:
"""
Crea ricorsivamente tutte le directory necessarie sul server FTP.
Args:
ftp: Connessione FTP attiva
remote_dir: Path della directory da creare (es. "home/ID0354/subdir")
"""
if not remote_dir or remote_dir == ".":
return
# Separa il path in parti
parts = remote_dir.split("/")
# Crea ogni livello di directory
current_path = ""
for part in parts:
if not part: # Salta parti vuote
continue
current_path = f"{current_path}/{part}" if current_path else part
try:
# Prova a creare la directory
ftp.mkd(current_path)
except Exception: # pylint: disable=broad-except
# Directory già esistente o altro errore, continua
pass
def upload_files_for_user(username: str, password: str) -> tuple[str, str, bool, int, int]:
"""
Carica tutti i file CSV dalla directory dell'utente via FTP.
Cerca ricorsivamente in tutte le sottodirectory e gestisce estensioni .csv e .CSV
Args:
username: Nome utente FTP
password: Password FTP
Returns:
Tuple con (username, status_message, successo, file_caricati, totale_file)
status_message può essere: 'OK', 'NO_DIR', 'NO_FILES', 'ERROR'
"""
user_csv_path = BASE_CSV_PATH / username
with log_lock:
logger.info("[%s] Inizio elaborazione", username)
# Verifica che la directory esista
if not user_csv_path.exists():
with log_lock:
logger.warning("[%s] Directory non trovata: %s", username, user_csv_path)
return (username, "NO_DIR", False, 0, 0)
# Trova tutti i file CSV ricorsivamente (sia .csv che .CSV)
csv_files = []
csv_files.extend(user_csv_path.glob("**/*.csv"))
csv_files.extend(user_csv_path.glob("**/*.CSV"))
if not csv_files:
with log_lock:
logger.warning("[%s] Nessun file CSV trovato in %s", username, user_csv_path)
return (username, "NO_FILES", False, 0, 0)
total_files = len(csv_files)
with log_lock:
logger.info("[%s] Trovati %s file CSV", username, total_files)
# Connessione FTP
try:
ftp = FTP()
ftp.connect(FTP_CONFIG["host"], FTP_CONFIG["port"])
ftp.login(username, password)
with log_lock:
logger.info("[%s] Connesso al server FTP", username)
# Upload di ogni file CSV mantenendo la struttura delle directory
uploaded = 0
for csv_file in csv_files:
try:
# Calcola il path relativo rispetto alla directory base dell'utente
relative_path = csv_file.relative_to(user_csv_path)
# Se il file è in una sottodirectory, crea la struttura sul server FTP
if relative_path.parent != Path("."):
# Crea ricorsivamente tutte le directory necessarie
remote_dir = str(relative_path.parent).replace("\\", "/")
create_remote_dir(ftp, remote_dir)
remote_file = str(relative_path).replace("\\", "/")
else:
remote_file = csv_file.name
# Carica il file (gli spazi nei nomi sono gestiti automaticamente da ftplib)
with log_lock:
logger.debug("[%s] Caricamento file: '%s'", username, remote_file)
with open(csv_file, "rb") as f:
ftp.storbinary(f"STOR {remote_file}", f)
with log_lock:
logger.info("[%s] File caricato: %s", username, remote_file)
uploaded += 1
except Exception as e: # pylint: disable=broad-except
with log_lock:
logger.error("[%s] Errore caricamento file %s: %s", username, csv_file.name, e)
ftp.quit()
with log_lock:
logger.info("[%s] Upload completato: %s/%s file caricati", username, uploaded, total_files)
return (username, "OK" if uploaded > 0 else "NO_UPLOAD", uploaded > 0, uploaded, total_files)
except Exception as e: # pylint: disable=broad-except
with log_lock:
logger.error("[%s] Errore FTP: %s", username, e)
return (username, "ERROR", False, 0, total_files if "total_files" in locals() else 0)
def main():
"""
Funzione principale per testare il caricamento FTP con upload paralleli.
"""
# Configura logging con file nella directory logs
log_dir = Path(__file__).parent / "logs"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / "test_ftp_client.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(), # Mantiene anche l'output su console
],
)
logger.info("=== Avvio test client FTP (modalità parallela) ===")
logger.info("Log file: %s", log_file)
logger.info("Path base CSV: %s", BASE_CSV_PATH)
logger.info("Server FTP: %s:%s", FTP_CONFIG["host"], FTP_CONFIG["port"])
logger.info("Worker paralleli: %s", MAX_WORKERS)
# Connessione al database
cfg = setting.Config()
db_connection = connetti_db(cfg)
try:
# Preleva gli utenti FTP dal database
users = fetch_ftp_users(db_connection)
if not users:
logger.warning("Nessun utente trovato nel database")
return
logger.info("Avvio upload parallelo per %s utenti...", len(users))
logger.info("")
# Usa ThreadPoolExecutor per upload paralleli
results = []
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Sottometti tutti i task
futures = {executor.submit(upload_files_for_user, username, password): username for username, password in users}
# Raccogli i risultati man mano che completano
for future in as_completed(futures):
username = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e: # pylint: disable=broad-except
logger.error("[%s] Eccezione durante l'upload: %s", username, e)
results.append((username, "ERROR", False, 0, 0))
# Analizza i risultati
logger.info("")
logger.info("=== Test completato ===")
success_count = sum(1 for _, _, success, _, _ in results if success)
error_count = len(results) - success_count
total_uploaded = sum(uploaded for _, _, _, uploaded, _ in results)
total_files = sum(total for _, _, _, _, total in results)
# Categorizza gli utenti per status
users_no_dir = [username for username, status, _, _, _ in results if status == "NO_DIR"]
users_no_files = [username for username, status, _, _, _ in results if status == "NO_FILES"]
users_error = [username for username, status, _, _, _ in results if status == "ERROR"]
users_ok = [username for username, status, _, _, _ in results if status == "OK"]
logger.info("Utenti con successo: %s/%s", success_count, len(users))
logger.info("Utenti con errori: %s/%s", error_count, len(users))
logger.info("File caricati totali: %s/%s", total_uploaded, total_files)
# Report utenti senza directory
if users_no_dir:
logger.info("")
logger.info("=== Utenti senza directory CSV (%s) ===", len(users_no_dir))
for username in sorted(users_no_dir):
logger.info(" - %s (directory attesa: %s)", username, BASE_CSV_PATH / username)
# Report utenti con directory vuota
if users_no_files:
logger.info("")
logger.info("=== Utenti con directory vuota (%s) ===", len(users_no_files))
for username in sorted(users_no_files):
logger.info(" - %s", username)
# Report utenti con errori FTP
if users_error:
logger.info("")
logger.info("=== Utenti con errori FTP (%s) ===", len(users_error))
for username in sorted(users_error):
logger.info(" - %s", username)
# Dettaglio per utente con successo
if users_ok:
logger.info("")
logger.info("=== Dettaglio utenti con successo (%s) ===", len(users_ok))
for username, status, _, uploaded, total in sorted(results):
if status == "OK":
logger.info("[%s] %s/%s file caricati", username, uploaded, total)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)
sys.exit(1)
finally:
try:
db_connection.close()
logger.info("")
logger.info("Connessione MySQL chiusa")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione MySQL: %s", e)
if __name__ == "__main__":
main()