import logging import ssl from datetime import datetime from io import BytesIO import aioftp import aiomysql from utils.database import WorkflowFlags from utils.database.action_query import get_data_as_csv, get_elab_timestamp, get_tool_info from utils.database.loader_action import unlock, update_status logger = logging.getLogger(__name__) class AsyncFTPConnection: """ Manages an async FTP or FTPS (TLS) connection with context manager support. This class provides a fully asynchronous FTP client using aioftp, replacing the blocking ftplib implementation for better performance in async workflows. Args: host (str): FTP server hostname or IP address port (int): FTP server port (default: 21) use_tls (bool): Use FTPS with TLS encryption (default: False) user (str): Username for authentication (default: "") passwd (str): Password for authentication (default: "") passive (bool): Use passive mode (default: True) timeout (float): Connection timeout in seconds (default: None) Example: async with AsyncFTPConnection(host="ftp.example.com", user="user", passwd="pass") as ftp: await ftp.change_directory("/uploads") await ftp.upload(data, "filename.csv") """ def __init__(self, host: str, port: int = 21, use_tls: bool = False, user: str = "", passwd: str = "", passive: bool = True, timeout: float = None): self.host = host self.port = port self.use_tls = use_tls self.user = user self.passwd = passwd self.passive = passive self.timeout = timeout self.client = None async def __aenter__(self): """Async context manager entry: connect and login""" # Create SSL context for FTPS if needed ssl_context = None if self.use_tls: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE # For compatibility with self-signed certs # Create client with appropriate socket timeout self.client = aioftp.Client(socket_timeout=self.timeout) # Connect with optional TLS if self.use_tls: await self.client.connect(self.host, self.port, ssl=ssl_context) else: await self.client.connect(self.host, self.port) # Login await self.client.login(self.user, self.passwd) # Set passive mode (aioftp uses passive by default, but we can configure if needed) # Note: aioftp doesn't have explicit passive mode setting like ftplib return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit: disconnect gracefully""" if self.client: try: await self.client.quit() except Exception as e: logger.warning(f"Error during FTP disconnect: {e}") async def change_directory(self, path: str): """Change working directory on FTP server""" await self.client.change_directory(path) async def upload(self, data: bytes, filename: str) -> bool: """ Upload data to FTP server. Args: data (bytes): Data to upload filename (str): Remote filename Returns: bool: True if upload successful, False otherwise """ try: # aioftp expects a stream or path, so we use BytesIO stream = BytesIO(data) await self.client.upload_stream(stream, filename) return True except Exception as e: logger.error(f"FTP upload error: {e}") return False async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool: """ Sends raw CSV data to a customer via FTP (async implementation). Retrieves raw CSV data from the database (received.tool_data column), then sends it to the customer via FTP using the unit's FTP configuration. Args: cfg (dict): Configuration dictionary. id (int): The ID of the record being processed (used for logging and DB query). unit (str): The name of the unit associated with the data. tool (str): The name of the tool associated with the data. pool (object): The database connection pool. Returns: bool: True if the CSV data was sent successfully, False otherwise. """ # Query per ottenere il CSV raw dal database raw_data_query = f""" SELECT tool_data FROM {cfg.dbname}.{cfg.dbrectable} WHERE id = %s """ # Query per ottenere le info FTP ftp_info_query = """ SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename_raw, ftp_target_raw, duedate FROM units WHERE name = %s """ async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: try: # 1. Recupera il CSV raw dal database await cur.execute(raw_data_query, (id,)) raw_data_result = await cur.fetchone() if not raw_data_result or not raw_data_result.get("tool_data"): logger.error(f"id {id} - {unit} - {tool}: nessun dato raw (tool_data) trovato nel database") return False csv_raw_data = raw_data_result["tool_data"] logger.info(f"id {id} - {unit} - {tool}: estratto CSV raw dal database ({len(csv_raw_data)} bytes)") # 2. Recupera configurazione FTP await cur.execute(ftp_info_query, (unit,)) send_ftp_info = await cur.fetchone() if not send_ftp_info: logger.error(f"id {id} - {unit} - {tool}: nessuna configurazione FTP trovata per unit") return False # Verifica che ci siano configurazioni per raw data if not send_ftp_info.get("ftp_filename_raw"): logger.warning(f"id {id} - {unit} - {tool}: ftp_filename_raw non configurato. Uso ftp_filename standard se disponibile") # Fallback al filename standard se raw non è configurato if not send_ftp_info.get("ftp_filename"): logger.error(f"id {id} - {unit} - {tool}: nessun filename FTP configurato") return False ftp_filename = send_ftp_info["ftp_filename"] else: ftp_filename = send_ftp_info["ftp_filename_raw"] # Target directory (con fallback) ftp_target = send_ftp_info.get("ftp_target_raw") or send_ftp_info.get("ftp_target") or "/" logger.info(f"id {id} - {unit} - {tool}: configurazione FTP raw estratta") except Exception as e: logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp raw: {e}") return False try: # 3. Converti in bytes se necessario if isinstance(csv_raw_data, str): csv_bytes = csv_raw_data.encode("utf-8") else: csv_bytes = csv_raw_data # 4. Parse parametri FTP ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"] or "") use_tls = "ssl_version" in ftp_parms passive = ftp_parms.get("passive", True) port = ftp_parms.get("port", 21) timeout = ftp_parms.get("timeout", 30.0) # 5. Async FTP connection e upload async with AsyncFTPConnection( host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive, timeout=timeout, ) as ftp: # Change directory se necessario if ftp_target and ftp_target != "/": await ftp.change_directory(ftp_target) # Upload raw data success = await ftp.upload(csv_bytes, ftp_filename) if success: logger.info(f"id {id} - {unit} - {tool}: File raw {ftp_filename} inviato con successo via FTP") return True else: logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP raw") return False except Exception as e: logger.error(f"id {id} - {unit} - {tool} - Errore FTP raw: {e}", exc_info=True) return False async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: """ Sends elaborated CSV data to a customer via FTP (async implementation). Retrieves FTP connection details from the database based on the unit name, then establishes an async FTP connection and uploads the CSV data. This function now uses aioftp for fully asynchronous FTP operations, eliminating blocking I/O that previously affected event loop performance. Args: cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency). id (int): The ID of the record being processed (used for logging). unit (str): The name of the unit associated with the data. tool (str): The name of the tool associated with the data. csv_data (str): The CSV data as a string to be sent. pool (object): The database connection pool. Returns: bool: True if the CSV data was sent successfully, False otherwise. """ query = """ SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate FROM units WHERE name = %s """ async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: try: await cur.execute(query, (unit,)) send_ftp_info = await cur.fetchone() if not send_ftp_info: logger.error(f"id {id} - {unit} - {tool}: nessun dato FTP trovato per unit") return False logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp") except Exception as e: logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp: {e}") return False try: # Convert to bytes csv_bytes = csv_data.encode("utf-8") # Parse FTP parameters ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"]) use_tls = "ssl_version" in ftp_parms passive = ftp_parms.get("passive", True) port = ftp_parms.get("port", 21) timeout = ftp_parms.get("timeout", 30.0) # Default 30 seconds # Async FTP connection async with AsyncFTPConnection( host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive, timeout=timeout, ) as ftp: # Change directory if needed if send_ftp_info["ftp_target"] and send_ftp_info["ftp_target"] != "/": await ftp.change_directory(send_ftp_info["ftp_target"]) # Upload file success = await ftp.upload(csv_bytes, send_ftp_info["ftp_filename"]) if success: logger.info(f"id {id} - {unit} - {tool}: File {send_ftp_info['ftp_filename']} inviato con successo via FTP") return True else: logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP") return False except Exception as e: logger.error(f"id {id} - {unit} - {tool} - Errore FTP: {e}", exc_info=True) return False async def parse_ftp_parms(ftp_parms: str) -> dict: """ Parses a string of FTP parameters into a dictionary. Args: ftp_parms (str): A string containing key-value pairs separated by commas, with keys and values separated by '=>'. Returns: dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values. """ # Rimuovere spazi e dividere per virgola pairs = ftp_parms.split(",") result = {} for pair in pairs: if "=>" in pair: key, value = pair.split("=>", 1) key = key.strip().lower() value = value.strip().lower() # Convertire i valori appropriati if value.isdigit(): value = int(value) elif value == "": value = None result[key] = value return result async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: object): """ Elabora un singolo record del workflow in base alla fase specificata. Args: record: Tupla contenente i dati del record fase: Fase corrente del workflow cfg: Configurazione pool: Pool di connessioni al database """ # Estrazione e normalizzazione dei dati del record id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] try: # Recupero informazioni principali tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool) if tool_elab_info: timestamp_matlab_elab = await get_elab_timestamp(id, pool) # Verifica se il processing può essere eseguito if not _should_process(tool_elab_info, timestamp_matlab_elab): logger.info( f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: invio dati non eseguito - due date raggiunta." ) await update_status(cfg, id, fase, pool) return # Routing basato sulla fase success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if success: await update_status(cfg, id, fase, pool) else: await update_status(cfg, id, fase, pool) except Exception as e: logger.error(f"Errore durante elaborazione id {id} - {unit_name} - {tool_name}: {e}") raise finally: await unlock(cfg, id, pool) def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bool: """ Determines if a record should be processed based on its due date. Args: tool_elab_info (dict): A dictionary containing information about the tool and its due date. timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. Returns: bool: True if the record should be processed, False otherwise.""" """Verifica se il record può essere processato basandosi sulla due date.""" duedate = tool_elab_info.get("duedate") # Se non c'è duedate o è vuota/nulla, può essere processato if not duedate or duedate in ("0000-00-00 00:00:00", ""): return True # Se timestamp_matlab_elab è None/null, usa il timestamp corrente comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now() # Converti duedate in datetime se è una stringa if isinstance(duedate, str): duedate = datetime.strptime(duedate, "%Y-%m-%d %H:%M:%S") # Assicurati che comparison_timestamp sia datetime if isinstance(comparison_timestamp, str): comparison_timestamp = datetime.strptime(comparison_timestamp, "%Y-%m-%d %H:%M:%S") return duedate > comparison_timestamp async def _route_by_phase( fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object ) -> bool: """ Routes the processing of a workflow record based on the current phase. This function acts as a dispatcher, calling the appropriate handler function for sending elaborated data or raw data based on the `fase` (phase) parameter. Args: fase (int): The current phase of the workflow (e.g., WorkflowFlags.SENT_ELAB_DATA, WorkflowFlags.SENT_RAW_DATA). tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status. cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. pool (object): The database connection pool. Returns: bool: True if the data sending operation was successful or no action was needed, False otherwise. """ if fase == WorkflowFlags.SENT_ELAB_DATA: return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) elif fase == WorkflowFlags.SENT_RAW_DATA: return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool) else: logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.") return True async def _handle_elab_data_phase( tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object ) -> bool: """ Handles the phase of sending elaborated data. This function checks if elaborated data needs to be sent via FTP or API based on the `tool_elab_info` and calls the appropriate sending function. Args: tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status, including flags for FTP and API sending. cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. pool (object): The database connection pool. Returns: bool: True if the data sending operation was successful or no action was needed, False otherwise. """ # FTP send per dati elaborati if tool_elab_info.get("ftp_send"): return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) # API send per dati elaborati elif _should_send_elab_api(tool_elab_info): return await _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) return True async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: """ Handles the phase of sending raw data. This function checks if raw data needs to be sent via FTP or API based on the `tool_elab_info` and calls the appropriate sending function. Args: tool_elab_info (dict): A dictionary containing information about the tool and its raw data sending status, including flags for FTP and API sending. cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. pool (object): The database connection pool. Returns: bool: True if the data sending operation was successful or no action was needed, False otherwise. """ # FTP send per dati raw if tool_elab_info.get("ftp_send_raw"): return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool) # API send per dati raw elif _should_send_raw_api(tool_elab_info): return await _send_raw_data_api(cfg, id, unit_name, tool_name, pool) return True def _should_send_elab_api(tool_elab_info: dict) -> bool: """Verifica se i dati elaborati devono essere inviati via API.""" return tool_elab_info.get("inoltro_api") and tool_elab_info.get("api_send") and tool_elab_info.get("inoltro_api_url", "").strip() def _should_send_raw_api(tool_elab_info: dict) -> bool: """Verifica se i dati raw devono essere inviati via API.""" return ( tool_elab_info.get("inoltro_api_raw") and tool_elab_info.get("api_send_raw") and tool_elab_info.get("inoltro_api_url_raw", "").strip() ) async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool: """ Sends elaborated data via FTP. This function retrieves the elaborated CSV data and attempts to send it to the customer via FTP using async operations. It logs success or failure. Args: cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. pool (object): The database connection pool. Returns: bool: True if the FTP sending was successful, False otherwise. """ try: elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if not elab_csv: logger.warning(f"id {id} - {unit_name} - {tool_name}: nessun dato CSV elaborato trovato") return False # Send via async FTP if await ftp_send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP completato con successo") return True else: logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP fallito") return False except Exception as e: logger.error(f"Errore invio FTP elab data id {id}: {e}", exc_info=True) return False async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool: """ Sends elaborated data via API. This function retrieves the elaborated CSV data and attempts to send it to the customer via an API. It logs success or failure. Args: cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. pool (object): The database connection pool. Returns: bool: True if the API sending was successful, False otherwise. """ try: elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if not elab_csv: return False logger.debug(f"id {id} - {unit_name} - {tool_name}: CSV elaborato pronto per invio API (size: {len(elab_csv)} bytes)") # if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): if True: # Placeholder per test return True else: logger.error(f"id {id} - {unit_name} - {tool_name}: invio API fallito.") return False except Exception as e: logger.error(f"Errore invio API elab data id {id}: {e}") return False async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: """ Sends raw data via FTP. This function attempts to send raw CSV data to the customer via FTP using async operations. It retrieves the raw data from the database and uploads it to the configured FTP server. Args: cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. pool (object): The database connection pool. Returns: bool: True if the FTP sending was successful, False otherwise. """ try: # Send raw CSV via async FTP if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP raw completato con successo") return True else: logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito") return False except Exception as e: logger.error(f"Errore invio FTP raw data id {id}: {e}", exc_info=True) return False async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: """ Sends raw data via API. This function attempts to send raw CSV data to the customer via an API. It logs success or failure. Args: cfg (dict): The configuration dictionary. id (int): The ID of the record being processed. unit_name (str): The name of the unit associated with the data. tool_name (str): The name of the tool associated with the data. pool (object): The database connection pool. Returns: bool: True if the API sending was successful, False otherwise. """ try: # if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): if True: # Placeholder per test return True else: logger.error(f"id {id} - {unit_name} - {tool_name}: invio API raw fallito.") return False except Exception as e: logger.error(f"Errore invio API raw data id {id}: {e}") return False