This commit is contained in:
2025-09-03 21:22:35 +02:00
parent 39dba8f54a
commit 54cb20b6af
5 changed files with 139 additions and 86 deletions

2
.pylintrc Normal file
View File

@@ -0,0 +1,2 @@
# Oppure se vuoi essere più permissivo
max-line-length=140

View File

@@ -1,4 +1,7 @@
#!.venv/bin/python
"""
Orchestratore dei worker che lanciano le elaborazioni
"""
# Import necessary libraries
import logging
@@ -22,6 +25,7 @@ ELAB_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
@@ -45,13 +49,13 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
await update_status(cfg, id, WorkflowFlags.START_ELAB, pool)
logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name)
await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
@@ -71,15 +75,11 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
else:
error_type = f"Matlab elab failed: {proc.returncode}."
"""
da verificare i log dove prenderli
with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
f.write(stderr.decode().strip())
# da verificare i log dove prenderli
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
"""
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
@@ -87,25 +87,25 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
else:
logger.info(stdout.decode().strip())
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['statustools']}: MatLab calc by-passed.")
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, id, pool)
logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools'])
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, id, pool)
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode)
await asyncio.sleep(1)

View File

@@ -1,23 +1,28 @@
#!.venv/bin/python
"""This module implements an FTP server with custom commands for managing virtual users and handling CSV file uploads."""
"""
This module implements an FTP server with custom commands for
managing virtual users and handling CSV file uploads.
"""
import os
import logging
from hashlib import sha256
from pathlib import Path
from utils.config import loader_ftp_csv as setting
from utils.database.connection import connetti_db
from utils.connect import user_admin, file_management
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
from utils.config import loader_ftp_csv as setting
from utils.database.connection import connetti_db
from utils.connect import user_admin, file_management
# Configure logging (moved inside main function)
logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
@@ -29,39 +34,44 @@ class DummySha256Authorizer(DummyAuthorizer):
"""
super().__init__()
self.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]
)
# Define the database connection
conn = connetti_db(cfg)
# Create a cursor
cur = conn.cursor()
cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL')
cur.execute(
f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL"
)
for ftpuser, hash, virtpath, perm in cur.fetchall():
self.add_user(ftpuser, hash, virtpath, perm)
"""
Create the user's directory if it does not exist.
"""
for ftpuser, user_hash, virtpath, perm in cur.fetchall():
self.add_user(ftpuser, user_hash, virtpath, perm)
# Create the user's directory if it does not exist.
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.responde(f'551 Error in create virtual user path: {e}')
except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored hash
hash = sha256(password.encode("UTF-8")).hexdigest()
def validate_authentication(
self: object, username: str, password: str, handler: object
) -> None:
# Validate the user's password against the stored user_hash
user_hash = sha256(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != hash:
if self.user_table[username]["pwd"] != user_hash:
raise KeyError
except KeyError:
raise AuthenticationFailed
class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
def __init__(self: object, conn: object, server: object, ioloop:object=None) -> None:
def __init__(
self: object, conn: object, server: object, ioloop: object = None
) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions.
Args:
@@ -73,20 +83,44 @@ class ASEHandler(FTPHandler):
self.proto_cmds = FTPHandler.proto_cmds.copy()
# Add custom FTP commands for managing virtual users - command in lowercase
self.proto_cmds.update(
{'SITE ADDU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
{
"SITE ADDU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE DISU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> DISU USERNAME (disable virtual user).')}
{
"SITE DISU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> DISU USERNAME (disable virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE ENAU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ENAU USERNAME (enable virtual user).')}
{
"SITE ENAU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ENAU USERNAME (enable virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE LSTU': dict(perm='M', auth=True, arg=None,
help='Syntax: SITE <SP> LSTU (list virtual users).')}
{
"SITE LSTU": dict(
perm="M",
auth=True,
arg=None,
help="Syntax: SITE <SP> LSTU (list virtual users).",
)
}
)
def on_file_received(self: object, file: str) -> None:
@@ -111,6 +145,7 @@ class ASEHandler(FTPHandler):
def ftp_SITE_LSTU(self: object, line: str) -> None:
return user_admin.ftp_SITE_LSTU(self, line)
def main():
"""Main function to start the FTP server."""
# Load the configuration settings
@@ -140,9 +175,8 @@ def main():
server.serve_forever()
except Exception as e:
logger.error(
f"Exit with error: {e}."
)
logger.error("Exit with error: %s.", e)
if __name__ == "__main__":
main()
main()

View File

@@ -1,15 +1,16 @@
#!/usr/bin/env python3
#!.venv/bin/python
"""
Script per prelevare dati da MySQL e inviare comandi SITE FTP
"""
import mysql.connector
from utils.database.connection import connetti_db
from utils.config import users_loader as setting
from ftplib import FTP
import logging
import sys
from typing import List, Tuple
import mysql.connector
from utils.database.connection import connetti_db
from utils.config import users_loader as setting
# Configurazione logging
logging.basicConfig(
@@ -38,8 +39,8 @@ def connect_ftp() -> FTP:
ftp.login(FTP_CONFIG['user'], FTP_CONFIG['password'])
logger.info("Connessione FTP stabilita")
return ftp
except Exception as e:
logger.error(f"Errore connessione FTP: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore connessione FTP: %s", e)
sys.exit(1)
def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tuple]:
@@ -63,11 +64,11 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tupl
cursor.execute(query)
results = cursor.fetchall()
logger.info(f"Prelevate {len(results)} righe dal database")
logger.info("Prelevate %s righe dal database", len(results))
return results
except mysql.connector.Error as e:
logger.error(f"Errore query database: {e}")
logger.error("Errore query database: %s", e)
return []
finally:
cursor.close()
@@ -82,14 +83,13 @@ def send_site_command(ftp: FTP, command: str) -> bool:
Returns:
bool: True if the command was sent successfully, False otherwise.
"""
"""Invia un comando SITE al server FTP"""
try:
# Il comando SITE viene inviato usando sendcmd
response = ftp.sendcmd(f"SITE {command}")
logger.info(f"Comando SITE '{command}' inviato. Risposta: {response}")
logger.info("Comando SITE %s inviato. Risposta: %s", command, response)
return True
except Exception as e:
logger.error(f"Errore invio comando SITE '{command}': {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore invio comando SITE %s: %s", command, e)
return False
def main():
@@ -121,7 +121,7 @@ def main():
# Costruisci il comando SITE completo
ftp_site_command = f'addu {username} {password}'
logger.info(f"Sending ftp command: {ftp_site_command}")
logger.info("Sending ftp command: %s", ftp_site_command)
# Invia comando SITE
if send_site_command(ftp_connection, ftp_site_command):
@@ -129,24 +129,24 @@ def main():
else:
error_count += 1
logger.info(f"Elaborazione completata. Successi: {success_count}, Errori: {error_count}")
logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count)
except Exception as e:
logger.error(f"Errore generale: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)
finally:
# Chiudi connessioni
try:
ftp_connection.quit()
logger.info("Connessione FTP chiusa")
except Exception as e:
logger.error(f"Errore chiusura connessione FTP: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione FTP: %s", e)
try:
db_connection.close()
logger.info("Connessione MySQL chiusa")
except Exception as e:
logger.error(f"Errore chiusura connessione MySQL: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione MySQL: %s", e)
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,7 @@
#!.venv/bin/python
"""
Orchestratore dei worker che caricano i dati su dataraw
"""
# Import necessary libraries
import logging
@@ -19,6 +22,7 @@ CSV_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
@@ -39,7 +43,12 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
try:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED)
record = await get_next_csv_atomic(
pool,
cfg.dbrectable,
WorkflowFlags.CSV_RECEIVED,
WorkflowFlags.DATA_LOADED,
)
if record:
success = await load_csv(record, cfg, pool)
@@ -50,8 +59,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=1)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=1)
await asyncio.sleep(1)
@@ -59,7 +68,8 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
"""Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
Args:
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
record: Una tupla contenente i dettagli del record CSV da elaborare
(rec_id, unit_type, tool_type, unit_name, tool_name).
cfg: L'oggetto di configurazione contenente i parametri del sistema.
pool (object): Il pool di connessioni al database.
@@ -70,11 +80,16 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.debug("Inizio ricerca nuovo CSV da elaborare")
id, unit_type, tool_type, unit_name, tool_name = [
rec_id, unit_type, tool_type, unit_name, tool_name = [
x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
]
logger.info(
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
"Trovato CSV da elaborare: ID=%s, Tipo=%s_%s, Nome=%s_%s",
rec_id,
unit_type,
tool_type,
unit_name,
tool_name,
)
# Costruisce il nome del modulo da caricare dinamicamente
@@ -87,27 +102,29 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
modulo = None
for module_name in module_names:
try:
logger.debug(f"Caricamento dinamico del modulo: {module_name}")
logger.debug("Caricamento dinamico del modulo: %s", module_name)
modulo = importlib.import_module(module_name)
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
logger.info("Funzione 'main_loader' caricata dal modulo %s", module_name)
break
except (ImportError, AttributeError) as e:
logger.debug(
f"Modulo {module_name} non presente o non valido. {e}",
"Modulo %s non presente o non valido. %s",
module_name,
e,
exc_info=debug_mode,
)
if not modulo:
logger.error(f"Nessun modulo trovato {module_names}")
logger.error("Nessun modulo trovato %s", module_names)
return False
# Ottiene la funzione 'main_loader' dal modulo
funzione = getattr(modulo, "main_loader")
# Esegui la funzione
logger.info(f"Elaborazione con modulo {modulo} per ID={id}")
await funzione(cfg, id, pool)
logger.info(f"Elaborazione completata per ID={id}")
logger.info("Elaborazione con modulo %s per ID=%s", modulo, rec_id)
await funzione(cfg, rec_id, pool)
logger.info("Elaborazione completata per ID=%s", rec_id)
return True