toloto redis rimesso mysql

This commit is contained in:
2025-11-01 16:58:21 +01:00
parent c850cc6e7e
commit c1de73ac73
9 changed files with 527 additions and 168 deletions

View File

@@ -1,6 +1,6 @@
services:
mysql:
image: mariadb:10.11
image: mysql:8.0
container_name: mysql
restart: unless-stopped
environment:
@@ -15,26 +15,13 @@ services:
ports:
- "3306:3306"
healthcheck:
test: ["CMD", "healthcheck.sh", "--connect", "--innodb_initialized"]
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${MYSQL_ROOT_PASSWORD:-Ase@2025}"]
interval: 10s
timeout: 5s
retries: 3
labels:
logging: "promtail"
logging_jobname: "mysql"
redis:
image: redis:7-alpine
container_name: redis-master
restart: unless-stopped
command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD:-Ase@2025}
volumes:
- redis_data:/data
networks:
- app-network
ports:
- "6379:6379"
labels:
logging: "promtail"
orchestrator-1-load:
build: .
container_name: orchestrator-1-load
@@ -42,7 +29,6 @@ services:
command: ["python", "-m", "src.load_orchestrator"]
environment:
DB_HOST: ${VIP:-192.168.1.210}
REDIS_HOST: ${VIP:-192.168.1.210}
ORCHESTRATOR_ID: 1
volumes:
- app-logs:/app/logs
@@ -57,7 +43,6 @@ services:
command: ["python", "-m", "src.elab_orchestrator"]
environment:
DB_HOST: ${VIP:-192.168.1.210}
REDIS_HOST: ${VIP:-192.168.1.210}
ORCHESTRATOR_ID: 2
volumes:
- app-logs:/app/logs
@@ -72,7 +57,6 @@ services:
command: ["python", "-m", "src.send_orchestrator"]
environment:
DB_HOST: ${VIP:-192.168.1.210}
REDIS_HOST: ${VIP:-192.168.1.210}
ORCHESTRATOR_ID: 3
volumes:
- app-logs:/app/logs
@@ -87,7 +71,6 @@ services:
command: ["python", "-m", "src.ftp_csv_receiver"]
environment:
DB_HOST: ${VIP:-192.168.1.210}
REDIS_HOST: ${VIP:-192.168.1.210}
FTP_INSTANCE_ID: 1
volumes:
- app-logs:/app/logs
@@ -132,5 +115,4 @@ networks:
app-network:
volumes:
mysql_data:
redis_data:
app-logs:

View File

@@ -9,54 +9,20 @@ import os
from hashlib import sha256
from pathlib import Path
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from utils.authorizers.database_authorizer import DatabaseAuthorizer
from utils.config import loader_ftp_csv as setting
from utils.connect import file_management, user_admin
from utils.database.connection import connetti_db
# Configure logging (moved inside main function)
logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database.
Args:
cfg: The configuration object.
"""
super().__init__()
self.add_user(cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
# Define the database connection
conn = connetti_db(cfg)
# Create a cursor
cur = conn.cursor()
cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL")
for ftpuser, user_hash, virtpath, perm in cur.fetchall():
# Create the user's directory if it does not exist.
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored user_hash
user_hash = sha256(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != user_hash:
raise KeyError
except KeyError:
raise AuthenticationFailed # noqa: B904
# Legacy authorizer kept for reference (not used anymore)
# The DatabaseAuthorizer is now used for real-time database synchronization
class ASEHandler(FTPHandler):
@@ -143,23 +109,29 @@ def main():
cfg = setting.Config()
try:
# Initialize the authorizer and handler
authorizer = DummySha256Authorizer(cfg)
# Configure logging first
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
filename=cfg.logfilename,
level=logging.INFO,
)
# Initialize the authorizer with database support
# This authorizer checks the database on every login, ensuring
# all FTP server instances stay synchronized without restarts
authorizer = DatabaseAuthorizer(cfg)
# Initialize handler
handler = ASEHandler
handler.cfg = cfg
handler.authorizer = authorizer
handler.masquerade_address = cfg.proxyaddr
# Set the range of passive ports for the FTP server
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
handler.passive_ports = _range
# Configure logging
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
# Use cfg.logfilename directly without checking its existence
filename=cfg.logfilename,
level=logging.INFO,
)
logger.info(f"Starting FTP server on port {cfg.service_port} with DatabaseAuthorizer")
# Create and start the FTP server
server = FTPServer(("0.0.0.0", cfg.service_port), handler)

View File

@@ -68,6 +68,38 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> list[tupl
cursor.close()
def fetch_existing_users(connection: mysql.connector.MySQLConnection) -> dict[str, tuple]:
"""
Fetches existing FTP users from virtusers table.
Args:
connection (mysql.connector.MySQLConnection): The database connection object.
Returns:
dict: Dictionary mapping username to (is_enabled, has_matching_password).
is_enabled is True if disabled_at is NULL.
"""
try:
cursor = connection.cursor()
query = """
SELECT ftpuser, disabled_at
FROM ase_lar.virtusers
"""
cursor.execute(query)
results = cursor.fetchall()
# Create dictionary: username -> is_enabled
users_dict = {username: (disabled_at is None) for username, disabled_at in results}
logger.info("Trovati %s utenti esistenti in virtusers", len(users_dict))
return users_dict
except mysql.connector.Error as e:
logger.error("Errore query database virtusers: %s", e)
return {}
finally:
cursor.close()
def send_site_command(ftp: FTP, command: str) -> bool:
"""
Sends a SITE command to the FTP server.
@@ -90,9 +122,13 @@ def send_site_command(ftp: FTP, command: str) -> bool:
def main():
"""
Main function to connect to the database, fetch FTP user data, and send SITE ADDU commands to the FTP server.
Main function to connect to the database, fetch FTP user data, and synchronize users to FTP server.
This function is idempotent - it can be run multiple times safely:
- If user exists and is enabled: skips
- If user exists but is disabled: enables it (SITE ENAU)
- If user doesn't exist: creates it (SITE ADDU)
"""
logger.info("Avvio script caricamento utenti FTP")
logger.info("Avvio script caricamento utenti FTP (idempotente)")
cfg = setting.Config()
# Connessioni
@@ -100,32 +136,58 @@ def main():
ftp_connection = connect_ftp()
try:
# Preleva dati dal database
data = fetch_data_from_db(db_connection)
# Preleva utenti da sincronizzare
users_to_sync = fetch_data_from_db(db_connection)
if not data:
logger.warning("Nessun dato trovato nel database")
if not users_to_sync:
logger.warning("Nessun utente da sincronizzare nel database ftp_accounts")
return
success_count = 0
# Preleva utenti già esistenti
existing_users = fetch_existing_users(db_connection)
added_count = 0
enabled_count = 0
skipped_count = 0
error_count = 0
# Processa ogni riga
for row in data:
# Processa ogni utente
for row in users_to_sync:
username, password = row
# Costruisci il comando SITE completo
ftp_site_command = f"addu {username} {password}"
if username in existing_users:
is_enabled = existing_users[username]
logger.info("Sending ftp command: %s", ftp_site_command)
if is_enabled:
# Utente già esiste ed è abilitato - skip
logger.info("Utente %s già esiste ed è abilitato - skip", username)
skipped_count += 1
else:
# Utente esiste ma è disabilitato - riabilita
logger.info("Utente %s esiste ma è disabilitato - riabilito con SITE ENAU", username)
ftp_site_command = f"enau {username}"
# Invia comando SITE
if send_site_command(ftp_connection, ftp_site_command):
success_count += 1
if send_site_command(ftp_connection, ftp_site_command):
enabled_count += 1
else:
error_count += 1
else:
error_count += 1
# Utente non esiste - crea
logger.info("Utente %s non esiste - creazione con SITE ADDU", username)
ftp_site_command = f"addu {username} {password}"
logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count)
if send_site_command(ftp_connection, ftp_site_command):
added_count += 1
else:
error_count += 1
logger.info(
"Elaborazione completata. Aggiunti: %s, Riabilitati: %s, Saltati: %s, Errori: %s",
added_count,
enabled_count,
skipped_count,
error_count
)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)

View File

@@ -0,0 +1,162 @@
"""
Database-backed authorizer for FTP server that checks authentication against database in real-time.
This ensures multiple FTP server instances stay synchronized without needing restarts.
"""
import logging
from hashlib import sha256
from pathlib import Path
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
class DatabaseAuthorizer(DummyAuthorizer):
"""
Custom authorizer that validates users against the database on every login.
This approach ensures that:
- Multiple FTP server instances stay synchronized
- User changes (add/remove/disable) are reflected immediately
- No server restart is needed when users are modified
"""
def __init__(self, cfg: dict) -> None:
"""
Initializes the authorizer with admin user only.
Regular users are validated against database at login time.
Args:
cfg: The configuration object.
"""
super().__init__()
self.cfg = cfg
# Add admin user to in-memory authorizer (always available)
self.add_user(
cfg.adminuser[0], # username
cfg.adminuser[1], # password hash
cfg.adminuser[2], # home directory
perm=cfg.adminuser[3] # permissions
)
logger.info("DatabaseAuthorizer initialized with admin user")
def validate_authentication(self, username: str, password: str, handler: object) -> None:
"""
Validates user authentication against the database.
This method is called on every login attempt and checks:
1. If user is admin, use in-memory credentials
2. Otherwise, query database for user credentials
3. Verify password hash matches
4. Ensure user is not disabled
Args:
username: The username attempting to login.
password: The plain-text password provided.
handler: The FTP handler object.
Raises:
AuthenticationFailed: If authentication fails for any reason.
"""
# Hash the provided password
password_hash = sha256(password.encode("UTF-8")).hexdigest()
# Check if user is admin (stored in memory)
if username == self.cfg.adminuser[0]:
if self.user_table[username]["pwd"] != password_hash:
logger.warning(f"Failed admin login attempt for user: {username}")
raise AuthenticationFailed("Invalid credentials")
return
# For regular users, check database
try:
conn = connetti_db(self.cfg)
cur = conn.cursor()
# Query user from database
cur.execute(
f"SELECT ftpuser, hash, virtpath, perm, disabled_at FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s",
(username,)
)
result = cur.fetchone()
cur.close()
conn.close()
if not result:
logger.warning(f"Login attempt for non-existent user: {username}")
raise AuthenticationFailed("Invalid credentials")
ftpuser, stored_hash, virtpath, perm, disabled_at = result
# Check if user is disabled
if disabled_at is not None:
logger.warning(f"Login attempt for disabled user: {username}")
raise AuthenticationFailed("User account is disabled")
# Verify password
if stored_hash != password_hash:
logger.warning(f"Invalid password for user: {username}")
raise AuthenticationFailed("Invalid credentials")
# Authentication successful - ensure user directory exists
try:
Path(virtpath).mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create directory for user {username}: {e}")
raise AuthenticationFailed("System error")
# Temporarily add user to in-memory table for this session
# This allows pyftpdlib to work correctly for the duration of the session
if username not in self.user_table:
self.add_user(ftpuser, stored_hash, virtpath, perm)
logger.info(f"Successful login for user: {username}")
except AuthenticationFailed:
raise
except Exception as e:
logger.error(f"Database error during authentication for user {username}: {e}", exc_info=True)
raise AuthenticationFailed("System error")
def has_user(self, username: str) -> bool:
"""
Check if a user exists in the database or in-memory table.
This is called by pyftpdlib for various checks. We override it to check
the database as well as the in-memory table.
Args:
username: The username to check.
Returns:
True if user exists and is enabled, False otherwise.
"""
# Check in-memory first (for admin and active sessions)
if username in self.user_table:
return True
# Check database for regular users
try:
conn = connetti_db(self.cfg)
cur = conn.cursor()
cur.execute(
f"SELECT COUNT(*) FROM {self.cfg.dbname}.{self.cfg.dbusertable} WHERE ftpuser = %s AND disabled_at IS NULL",
(username,)
)
count = cur.fetchone()[0]
cur.close()
conn.close()
return count > 0
except Exception as e:
logger.error(f"Database error checking user existence for {username}: {e}")
return False