initial working

This commit is contained in:
2025-10-31 21:00:14 +01:00
commit c850cc6e7e
212 changed files with 24622 additions and 0 deletions

View File

View File

@@ -0,0 +1,123 @@
import asyncio
import logging
import os
import re
from datetime import datetime
from utils.csv.parser import extract_value
from utils.database.connection import connetti_db_async
logger = logging.getLogger(__name__)
def on_file_received(self: object, file: str) -> None:
"""
Wrapper sincrono per on_file_received_async.
Questo wrapper permette di mantenere la compatibilità con il server FTP
che si aspetta una funzione sincrona, mentre internamente usa asyncio.
"""
asyncio.run(on_file_received_async(self, file))
async def on_file_received_async(self: object, file: str) -> None:
"""
Processes a received file, extracts relevant information, and inserts it into the database.
If the file is empty, it is removed. Otherwise, it extracts unit and tool
information from the filename and the first few lines of the CSV, handles
aliases, and then inserts the data into the configured database table.
Args:
file (str): The path to the received file."""
if not os.stat(file).st_size:
os.remove(file)
logger.info(f"File {file} is empty: removed.")
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}")
if fileExtension.upper() in (cfg.fileext):
with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile:
lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
tool_info = "{}"
# se esiste l'alias in alias_unit_type, allora prende il valore dell'alias
# verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx
upper_unit_type = unit_type.upper()
unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type
upper_tool_type = tool_type.upper()
tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type
try:
# Use async database connection to avoid blocking
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
return
try:
# Create a cursor
async with conn.cursor() as cur:
# da estrarre in un modulo
if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK":
serial_number = filename.split("_")[0]
tool_info = f'{{"serial_number": {serial_number}}}'
try:
# Use parameterized query to prevent SQL injection
await cur.execute(
f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = %s", (serial_number,)
)
result = await cur.fetchone()
if result:
unit_name, tool_name = result
except Exception as e:
logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}")
# da estrarre in un modulo
if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR":
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
stazione = extract_value(escaped_keys, filename)
if stazione:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
# Insert file data into database
await cur.execute(
f"""INSERT INTO {cfg.dbname}.{cfg.dbrectable}
(username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info)
VALUES (%s,%s, %s, %s, %s, %s, %s, %s)""",
(
self.username,
new_filename,
unit_name.upper(),
unit_type.upper(),
tool_name.upper(),
tool_type.upper(),
"".join(lines),
tool_info,
),
)
# Note: autocommit=True in connection, no need for explicit commit
logger.info(f"File {new_filename} loaded successfully")
except Exception as e:
logger.error(f"File {new_filename} not loaded. Held in user path.")
logger.error(f"{e}")
finally:
# Always close the connection
conn.close()
"""
else:
os.remove(file)
logger.info(f'File {new_filename} removed.')
"""

View File

@@ -0,0 +1,655 @@
import logging
import ssl
from datetime import datetime
from io import BytesIO
import aioftp
import aiomysql
from utils.database import WorkflowFlags
from utils.database.action_query import get_data_as_csv, get_elab_timestamp, get_tool_info
from utils.database.loader_action import unlock, update_status
logger = logging.getLogger(__name__)
class AsyncFTPConnection:
"""
Manages an async FTP or FTPS (TLS) connection with context manager support.
This class provides a fully asynchronous FTP client using aioftp, replacing
the blocking ftplib implementation for better performance in async workflows.
Args:
host (str): FTP server hostname or IP address
port (int): FTP server port (default: 21)
use_tls (bool): Use FTPS with TLS encryption (default: False)
user (str): Username for authentication (default: "")
passwd (str): Password for authentication (default: "")
passive (bool): Use passive mode (default: True)
timeout (float): Connection timeout in seconds (default: None)
Example:
async with AsyncFTPConnection(host="ftp.example.com", user="user", passwd="pass") as ftp:
await ftp.change_directory("/uploads")
await ftp.upload(data, "filename.csv")
"""
def __init__(self, host: str, port: int = 21, use_tls: bool = False, user: str = "",
passwd: str = "", passive: bool = True, timeout: float = None):
self.host = host
self.port = port
self.use_tls = use_tls
self.user = user
self.passwd = passwd
self.passive = passive
self.timeout = timeout
self.client = None
async def __aenter__(self):
"""Async context manager entry: connect and login"""
# Create SSL context for FTPS if needed
ssl_context = None
if self.use_tls:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE # For compatibility with self-signed certs
# Create client with appropriate socket timeout
self.client = aioftp.Client(socket_timeout=self.timeout)
# Connect with optional TLS
if self.use_tls:
await self.client.connect(self.host, self.port, ssl=ssl_context)
else:
await self.client.connect(self.host, self.port)
# Login
await self.client.login(self.user, self.passwd)
# Set passive mode (aioftp uses passive by default, but we can configure if needed)
# Note: aioftp doesn't have explicit passive mode setting like ftplib
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit: disconnect gracefully"""
if self.client:
try:
await self.client.quit()
except Exception as e:
logger.warning(f"Error during FTP disconnect: {e}")
async def change_directory(self, path: str):
"""Change working directory on FTP server"""
await self.client.change_directory(path)
async def upload(self, data: bytes, filename: str) -> bool:
"""
Upload data to FTP server.
Args:
data (bytes): Data to upload
filename (str): Remote filename
Returns:
bool: True if upload successful, False otherwise
"""
try:
# aioftp expects a stream or path, so we use BytesIO
stream = BytesIO(data)
await self.client.upload_stream(stream, filename)
return True
except Exception as e:
logger.error(f"FTP upload error: {e}")
return False
async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool:
"""
Sends raw CSV data to a customer via FTP (async implementation).
Retrieves raw CSV data from the database (received.tool_data column),
then sends it to the customer via FTP using the unit's FTP configuration.
Args:
cfg (dict): Configuration dictionary.
id (int): The ID of the record being processed (used for logging and DB query).
unit (str): The name of the unit associated with the data.
tool (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the CSV data was sent successfully, False otherwise.
"""
# Query per ottenere il CSV raw dal database
raw_data_query = f"""
SELECT tool_data
FROM {cfg.dbname}.{cfg.dbrectable}
WHERE id = %s
"""
# Query per ottenere le info FTP
ftp_info_query = """
SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename_raw, ftp_target_raw, duedate
FROM units
WHERE name = %s
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
# 1. Recupera il CSV raw dal database
await cur.execute(raw_data_query, (id,))
raw_data_result = await cur.fetchone()
if not raw_data_result or not raw_data_result.get("tool_data"):
logger.error(f"id {id} - {unit} - {tool}: nessun dato raw (tool_data) trovato nel database")
return False
csv_raw_data = raw_data_result["tool_data"]
logger.info(f"id {id} - {unit} - {tool}: estratto CSV raw dal database ({len(csv_raw_data)} bytes)")
# 2. Recupera configurazione FTP
await cur.execute(ftp_info_query, (unit,))
send_ftp_info = await cur.fetchone()
if not send_ftp_info:
logger.error(f"id {id} - {unit} - {tool}: nessuna configurazione FTP trovata per unit")
return False
# Verifica che ci siano configurazioni per raw data
if not send_ftp_info.get("ftp_filename_raw"):
logger.warning(f"id {id} - {unit} - {tool}: ftp_filename_raw non configurato. Uso ftp_filename standard se disponibile")
# Fallback al filename standard se raw non è configurato
if not send_ftp_info.get("ftp_filename"):
logger.error(f"id {id} - {unit} - {tool}: nessun filename FTP configurato")
return False
ftp_filename = send_ftp_info["ftp_filename"]
else:
ftp_filename = send_ftp_info["ftp_filename_raw"]
# Target directory (con fallback)
ftp_target = send_ftp_info.get("ftp_target_raw") or send_ftp_info.get("ftp_target") or "/"
logger.info(f"id {id} - {unit} - {tool}: configurazione FTP raw estratta")
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp raw: {e}")
return False
try:
# 3. Converti in bytes se necessario
if isinstance(csv_raw_data, str):
csv_bytes = csv_raw_data.encode("utf-8")
else:
csv_bytes = csv_raw_data
# 4. Parse parametri FTP
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"] or "")
use_tls = "ssl_version" in ftp_parms
passive = ftp_parms.get("passive", True)
port = ftp_parms.get("port", 21)
timeout = ftp_parms.get("timeout", 30.0)
# 5. Async FTP connection e upload
async with AsyncFTPConnection(
host=send_ftp_info["ftp_addrs"],
port=port,
use_tls=use_tls,
user=send_ftp_info["ftp_user"],
passwd=send_ftp_info["ftp_passwd"],
passive=passive,
timeout=timeout,
) as ftp:
# Change directory se necessario
if ftp_target and ftp_target != "/":
await ftp.change_directory(ftp_target)
# Upload raw data
success = await ftp.upload(csv_bytes, ftp_filename)
if success:
logger.info(f"id {id} - {unit} - {tool}: File raw {ftp_filename} inviato con successo via FTP")
return True
else:
logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP raw")
return False
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - Errore FTP raw: {e}", exc_info=True)
return False
async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
"""
Sends elaborated CSV data to a customer via FTP (async implementation).
Retrieves FTP connection details from the database based on the unit name,
then establishes an async FTP connection and uploads the CSV data.
This function now uses aioftp for fully asynchronous FTP operations,
eliminating blocking I/O that previously affected event loop performance.
Args:
cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency).
id (int): The ID of the record being processed (used for logging).
unit (str): The name of the unit associated with the data.
tool (str): The name of the tool associated with the data.
csv_data (str): The CSV data as a string to be sent.
pool (object): The database connection pool.
Returns:
bool: True if the CSV data was sent successfully, False otherwise.
"""
query = """
SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate
FROM units
WHERE name = %s
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
await cur.execute(query, (unit,))
send_ftp_info = await cur.fetchone()
if not send_ftp_info:
logger.error(f"id {id} - {unit} - {tool}: nessun dato FTP trovato per unit")
return False
logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp")
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp: {e}")
return False
try:
# Convert to bytes
csv_bytes = csv_data.encode("utf-8")
# Parse FTP parameters
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"])
use_tls = "ssl_version" in ftp_parms
passive = ftp_parms.get("passive", True)
port = ftp_parms.get("port", 21)
timeout = ftp_parms.get("timeout", 30.0) # Default 30 seconds
# Async FTP connection
async with AsyncFTPConnection(
host=send_ftp_info["ftp_addrs"],
port=port,
use_tls=use_tls,
user=send_ftp_info["ftp_user"],
passwd=send_ftp_info["ftp_passwd"],
passive=passive,
timeout=timeout,
) as ftp:
# Change directory if needed
if send_ftp_info["ftp_target"] and send_ftp_info["ftp_target"] != "/":
await ftp.change_directory(send_ftp_info["ftp_target"])
# Upload file
success = await ftp.upload(csv_bytes, send_ftp_info["ftp_filename"])
if success:
logger.info(f"id {id} - {unit} - {tool}: File {send_ftp_info['ftp_filename']} inviato con successo via FTP")
return True
else:
logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP")
return False
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - Errore FTP: {e}", exc_info=True)
return False
async def parse_ftp_parms(ftp_parms: str) -> dict:
"""
Parses a string of FTP parameters into a dictionary.
Args:
ftp_parms (str): A string containing key-value pairs separated by commas,
with keys and values separated by '=>'.
Returns:
dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values.
"""
# Rimuovere spazi e dividere per virgola
pairs = ftp_parms.split(",")
result = {}
for pair in pairs:
if "=>" in pair:
key, value = pair.split("=>", 1)
key = key.strip().lower()
value = value.strip().lower()
# Convertire i valori appropriati
if value.isdigit():
value = int(value)
elif value == "":
value = None
result[key] = value
return result
async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: object):
"""
Elabora un singolo record del workflow in base alla fase specificata.
Args:
record: Tupla contenente i dati del record
fase: Fase corrente del workflow
cfg: Configurazione
pool: Pool di connessioni al database
"""
# Estrazione e normalizzazione dei dati del record
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
try:
# Recupero informazioni principali
tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
timestamp_matlab_elab = await get_elab_timestamp(id, pool)
# Verifica se il processing può essere eseguito
if not _should_process(tool_elab_info, timestamp_matlab_elab):
logger.info(
f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: invio dati non eseguito - due date raggiunta."
)
await update_status(cfg, id, fase, pool)
return
# Routing basato sulla fase
success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if success:
await update_status(cfg, id, fase, pool)
else:
await update_status(cfg, id, fase, pool)
except Exception as e:
logger.error(f"Errore durante elaborazione id {id} - {unit_name} - {tool_name}: {e}")
raise
finally:
await unlock(cfg, id, pool)
def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bool:
"""
Determines if a record should be processed based on its due date.
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its due date.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
Returns:
bool: True if the record should be processed, False otherwise."""
"""Verifica se il record può essere processato basandosi sulla due date."""
duedate = tool_elab_info.get("duedate")
# Se non c'è duedate o è vuota/nulla, può essere processato
if not duedate or duedate in ("0000-00-00 00:00:00", ""):
return True
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now()
# Converti duedate in datetime se è una stringa
if isinstance(duedate, str):
duedate = datetime.strptime(duedate, "%Y-%m-%d %H:%M:%S")
# Assicurati che comparison_timestamp sia datetime
if isinstance(comparison_timestamp, str):
comparison_timestamp = datetime.strptime(comparison_timestamp, "%Y-%m-%d %H:%M:%S")
return duedate > comparison_timestamp
async def _route_by_phase(
fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object
) -> bool:
"""
Routes the processing of a workflow record based on the current phase.
This function acts as a dispatcher, calling the appropriate handler function
for sending elaborated data or raw data based on the `fase` (phase) parameter.
Args:
fase (int): The current phase of the workflow (e.g., WorkflowFlags.SENT_ELAB_DATA, WorkflowFlags.SENT_RAW_DATA).
tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
if fase == WorkflowFlags.SENT_ELAB_DATA:
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
elif fase == WorkflowFlags.SENT_RAW_DATA:
return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool)
else:
logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.")
return True
async def _handle_elab_data_phase(
tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object
) -> bool:
"""
Handles the phase of sending elaborated data.
This function checks if elaborated data needs to be sent via FTP or API
based on the `tool_elab_info` and calls the appropriate sending function.
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status,
including flags for FTP and API sending.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati elaborati
if tool_elab_info.get("ftp_send"):
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
# API send per dati elaborati
elif _should_send_elab_api(tool_elab_info):
return await _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
return True
async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Handles the phase of sending raw data.
This function checks if raw data needs to be sent via FTP or API
based on the `tool_elab_info` and calls the appropriate sending function.
Args:
tool_elab_info (dict): A dictionary containing information about the tool and its raw data sending status,
including flags for FTP and API sending.
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati raw
if tool_elab_info.get("ftp_send_raw"):
return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool)
# API send per dati raw
elif _should_send_raw_api(tool_elab_info):
return await _send_raw_data_api(cfg, id, unit_name, tool_name, pool)
return True
def _should_send_elab_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati elaborati devono essere inviati via API."""
return tool_elab_info.get("inoltro_api") and tool_elab_info.get("api_send") and tool_elab_info.get("inoltro_api_url", "").strip()
def _should_send_raw_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati raw devono essere inviati via API."""
return (
tool_elab_info.get("inoltro_api_raw")
and tool_elab_info.get("api_send_raw")
and tool_elab_info.get("inoltro_api_url_raw", "").strip()
)
async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Sends elaborated data via FTP.
This function retrieves the elaborated CSV data and attempts to send it
to the customer via FTP using async operations. It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the FTP sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if not elab_csv:
logger.warning(f"id {id} - {unit_name} - {tool_name}: nessun dato CSV elaborato trovato")
return False
# Send via async FTP
if await ftp_send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP completato con successo")
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP fallito")
return False
except Exception as e:
logger.error(f"Errore invio FTP elab data id {id}: {e}", exc_info=True)
return False
async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool:
"""
Sends elaborated data via API.
This function retrieves the elaborated CSV data and attempts to send it
to the customer via an API. It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration.
pool (object): The database connection pool.
Returns:
bool: True if the API sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if not elab_csv:
return False
logger.debug(f"id {id} - {unit_name} - {tool_name}: CSV elaborato pronto per invio API (size: {len(elab_csv)} bytes)")
# if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API fallito.")
return False
except Exception as e:
logger.error(f"Errore invio API elab data id {id}: {e}")
return False
async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Sends raw data via FTP.
This function attempts to send raw CSV data to the customer via FTP
using async operations. It retrieves the raw data from the database
and uploads it to the configured FTP server.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the FTP sending was successful, False otherwise.
"""
try:
# Send raw CSV via async FTP
if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP raw completato con successo")
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito")
return False
except Exception as e:
logger.error(f"Errore invio FTP raw data id {id}: {e}", exc_info=True)
return False
async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool:
"""
Sends raw data via API.
This function attempts to send raw CSV data to the customer via an API.
It logs success or failure.
Args:
cfg (dict): The configuration dictionary.
id (int): The ID of the record being processed.
unit_name (str): The name of the unit associated with the data.
tool_name (str): The name of the tool associated with the data.
pool (object): The database connection pool.
Returns:
bool: True if the API sending was successful, False otherwise.
"""
try:
# if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API raw fallito.")
return False
except Exception as e:
logger.error(f"Errore invio API raw data id {id}: {e}")
return False

View File

@@ -0,0 +1,63 @@
import logging
from email.message import EmailMessage
import aiosmtplib
from utils.config import loader_email as setting
cfg = setting.Config()
logger = logging.getLogger(__name__)
async def send_error_email(unit_name: str, tool_name: str, matlab_cmd: str, matlab_error: str, errors: list, warnings: list) -> None:
"""
Sends an error email containing details about a MATLAB processing failure.
The email includes information about the unit, tool, MATLAB command, error message,
and lists of specific errors and warnings encountered.
Args:
unit_name (str): The name of the unit involved in the processing.
tool_name (str): The name of the tool involved in the processing.
matlab_cmd (str): The MATLAB command that was executed.
matlab_error (str): The main MATLAB error message.
errors (list): A list of detailed error messages from MATLAB.
warnings (list): A list of detailed warning messages from MATLAB.
"""
# Creazione dell'oggetto messaggio
msg = EmailMessage()
msg["Subject"] = cfg.subject
msg["From"] = cfg.from_addr
msg["To"] = cfg.to_addr
msg["Cc"] = cfg.cc_addr
msg["Bcc"] = cfg.bcc_addr
MatlabErrors = "<br/>".join(errors)
MatlabWarnings = "<br/>".join(dict.fromkeys(warnings))
# Imposta il contenuto del messaggio come HTML
msg.add_alternative(
cfg.body.format(
unit=unit_name,
tool=tool_name,
matlab_cmd=matlab_cmd,
matlab_error=matlab_error,
MatlabErrors=MatlabErrors,
MatlabWarnings=MatlabWarnings,
),
subtype="html",
)
try:
# Use async SMTP to prevent blocking the event loop
await aiosmtplib.send(
msg,
hostname=cfg.smtp_addr,
port=cfg.smtp_port,
username=cfg.smtp_user,
password=cfg.smtp_passwd,
start_tls=True,
)
logger.info("Email inviata con successo!")
except Exception as e:
logger.error(f"Errore durante l'invio dell'email: {e}")

View File

@@ -0,0 +1,228 @@
import asyncio
import logging
import os
from hashlib import sha256
from pathlib import Path
from utils.database.connection import connetti_db_async
logger = logging.getLogger(__name__)
# Sync wrappers for FTP commands (required by pyftpdlib)
def ftp_SITE_ADDU(self: object, line: str) -> None:
"""Sync wrapper for ftp_SITE_ADDU_async."""
asyncio.run(ftp_SITE_ADDU_async(self, line))
def ftp_SITE_DISU(self: object, line: str) -> None:
"""Sync wrapper for ftp_SITE_DISU_async."""
asyncio.run(ftp_SITE_DISU_async(self, line))
def ftp_SITE_ENAU(self: object, line: str) -> None:
"""Sync wrapper for ftp_SITE_ENAU_async."""
asyncio.run(ftp_SITE_ENAU_async(self, line))
def ftp_SITE_LSTU(self: object, line: str) -> None:
"""Sync wrapper for ftp_SITE_LSTU_async."""
asyncio.run(ftp_SITE_LSTU_async(self, line))
# Async implementations
async def ftp_SITE_ADDU_async(self: object, line: str) -> None:
"""
Adds a virtual user, creates their directory, and saves their details to the database.
Args:
line (str): A string containing the username and password separated by a space.
"""
cfg = self.cfg
try:
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
password = parms[1] # Get the password
hash_value = sha256(password.encode("UTF-8")).hexdigest() # Hash the password
except IndexError:
self.respond("501 SITE ADDU failed. Command needs 2 arguments")
else:
try:
# Create the user's directory
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.respond(f"551 Error in create virtual user path: {e}")
else:
try:
# Add the user to the authorizer
self.authorizer.add_user(str(user), hash_value, cfg.virtpath + "/" + user, perm=cfg.defperm)
# Save the user to the database using async connection
try:
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
self.respond("501 SITE ADDU failed: Database error")
return
try:
async with conn.cursor() as cur:
# Use parameterized query to prevent SQL injection
await cur.execute(
f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES (%s, %s, %s, %s)",
(user, hash_value, cfg.virtpath + user, cfg.defperm),
)
# autocommit=True in connection
logger.info(f"User {user} created.")
self.respond("200 SITE ADDU successful.")
except Exception as e:
self.respond(f"501 SITE ADDU failed: {e}.")
logger.error(f"Error creating user {user}: {e}")
finally:
conn.close()
except Exception as e:
self.respond(f"501 SITE ADDU failed: {e}.")
logger.error(f"Error in ADDU: {e}")
async def ftp_SITE_DISU_async(self: object, line: str) -> None:
"""
Removes a virtual user from the authorizer and marks them as deleted in the database.
Args:
line (str): A string containing the username to be disabled.
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
try:
# Remove the user from the authorizer
self.authorizer.remove_user(str(user))
# Delete the user from database
try:
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
self.respond("501 SITE DISU failed: Database error")
return
try:
async with conn.cursor() as cur:
# Use parameterized query to prevent SQL injection
await cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = NOW() WHERE ftpuser = %s", (user,))
# autocommit=True in connection
logger.info(f"User {user} deleted.")
self.respond("200 SITE DISU successful.")
except Exception as e:
logger.error(f"Error disabling user {user}: {e}")
self.respond("501 SITE DISU failed.")
finally:
conn.close()
except Exception as e:
self.respond("501 SITE DISU failed.")
logger.error(f"Error in DISU: {e}")
async def ftp_SITE_ENAU_async(self: object, line: str) -> None:
"""
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
Args:
line (str): A string containing the username to be enabled.
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
try:
# Restore the user into database
try:
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
self.respond("501 SITE ENAU failed: Database error")
return
try:
async with conn.cursor() as cur:
# Enable the user
await cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = NULL WHERE ftpuser = %s", (user,))
# Fetch user details
await cur.execute(
f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE ftpuser = %s", (user,)
)
result = await cur.fetchone()
if not result:
self.respond(f"501 SITE ENAU failed: User {user} not found")
return
ftpuser, hash_value, virtpath, perm = result
self.authorizer.add_user(ftpuser, hash_value, virtpath, perm)
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.respond(f"551 Error in create virtual user path: {e}")
return
logger.info(f"User {user} restored.")
self.respond("200 SITE ENAU successful.")
except Exception as e:
logger.error(f"Error enabling user {user}: {e}")
self.respond("501 SITE ENAU failed.")
finally:
conn.close()
except Exception as e:
self.respond("501 SITE ENAU failed.")
logger.error(f"Error in ENAU: {e}")
async def ftp_SITE_LSTU_async(self: object, line: str) -> None:
"""
Lists all virtual users from the database.
Args:
line (str): An empty string (no arguments needed for this command).
"""
cfg = self.cfg
users_list = []
try:
# Connect to the database to fetch users
try:
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
self.respond("501 SITE LSTU failed: Database error")
return
try:
async with conn.cursor() as cur:
self.push("214-The following virtual users are defined:\r\n")
await cur.execute(f"SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}")
results = await cur.fetchall()
for ftpuser, perm, disabled_at in results:
users_list.append(f"Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n")
self.push("".join(users_list))
self.respond("214 LSTU SITE command successful.")
except Exception as e:
self.respond(f"501 list users failed: {e}")
logger.error(f"Error listing users: {e}")
finally:
conn.close()
except Exception as e:
self.respond(f"501 list users failed: {e}")
logger.error(f"Error in LSTU: {e}")