From 0f91cf1fd44de2276fb2bde632632ab0d069b081 Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 11 Oct 2025 21:45:20 +0200 Subject: [PATCH] feat: implement ftp_send_raw_csv_to_customer function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete the FTP async migration by implementing the missing ftp_send_raw_csv_to_customer() function for sending raw CSV data. ## Changes ### Implementation - Implemented ftp_send_raw_csv_to_customer(): * Retrieves raw CSV from database (received.tool_data column) * Queries FTP configuration from units table * Supports ftp_filename_raw and ftp_target_raw columns * Fallback to standard ftp_filename/ftp_target if raw not configured * Full async implementation with AsyncFTPConnection - Updated _send_raw_data_ftp(): * Removed placeholder (if True) * Now calls actual ftp_send_raw_csv_to_customer() * Enhanced error handling and logging ### Features - Dual query approach: 1. Get raw CSV data from received table by id 2. Get FTP config from units table by unit name - Smart fallback for filename/target directory - Proper error handling for missing data/config - Detailed logging for debugging - Supports both string and bytes data types ### Database Schema Support Expected columns in units table: - ftp_filename_raw (optional, fallback to ftp_filename) - ftp_target_raw (optional, fallback to ftp_target) - ftp_addrs, ftp_user, ftp_passwd, ftp_parm (required) Expected columns in received table: - tool_data (TEXT/BLOB containing raw CSV data) ## Impact - Completes raw data FTP workflow - Enables automatic sending of unprocessed CSV files to customers - Maintains consistency with elaborated data sending flow - Full async implementation (no blocking I/O) ## Testing Manual testing required with: - Database with raw CSV data in received.tool_data - Unit configuration with FTP settings - Accessible FTP/FTPS server 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/utils/connect/send_data.py | 128 +++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 7 deletions(-) diff --git a/src/utils/connect/send_data.py b/src/utils/connect/send_data.py index bde03ad..93aa473 100644 --- a/src/utils/connect/send_data.py +++ b/src/utils/connect/send_data.py @@ -106,7 +106,119 @@ class AsyncFTPConnection: async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool: - return True + """ + Sends raw CSV data to a customer via FTP (async implementation). + + Retrieves raw CSV data from the database (received.tool_data column), + then sends it to the customer via FTP using the unit's FTP configuration. + + Args: + cfg (dict): Configuration dictionary. + id (int): The ID of the record being processed (used for logging and DB query). + unit (str): The name of the unit associated with the data. + tool (str): The name of the tool associated with the data. + pool (object): The database connection pool. + + Returns: + bool: True if the CSV data was sent successfully, False otherwise. + """ + # Query per ottenere il CSV raw dal database + raw_data_query = f""" + SELECT tool_data + FROM {cfg.dbname}.{cfg.dbrectable} + WHERE id = %s + """ + + # Query per ottenere le info FTP + ftp_info_query = """ + SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename_raw, ftp_target_raw, duedate + FROM units + WHERE name = %s + """ + + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + try: + # 1. Recupera il CSV raw dal database + await cur.execute(raw_data_query, (id,)) + raw_data_result = await cur.fetchone() + + if not raw_data_result or not raw_data_result.get("tool_data"): + logger.error(f"id {id} - {unit} - {tool}: nessun dato raw (tool_data) trovato nel database") + return False + + csv_raw_data = raw_data_result["tool_data"] + logger.info(f"id {id} - {unit} - {tool}: estratto CSV raw dal database ({len(csv_raw_data)} bytes)") + + # 2. Recupera configurazione FTP + await cur.execute(ftp_info_query, (unit,)) + send_ftp_info = await cur.fetchone() + + if not send_ftp_info: + logger.error(f"id {id} - {unit} - {tool}: nessuna configurazione FTP trovata per unit") + return False + + # Verifica che ci siano configurazioni per raw data + if not send_ftp_info.get("ftp_filename_raw"): + logger.warning(f"id {id} - {unit} - {tool}: ftp_filename_raw non configurato. Uso ftp_filename standard se disponibile") + # Fallback al filename standard se raw non è configurato + if not send_ftp_info.get("ftp_filename"): + logger.error(f"id {id} - {unit} - {tool}: nessun filename FTP configurato") + return False + ftp_filename = send_ftp_info["ftp_filename"] + else: + ftp_filename = send_ftp_info["ftp_filename_raw"] + + # Target directory (con fallback) + ftp_target = send_ftp_info.get("ftp_target_raw") or send_ftp_info.get("ftp_target") or "/" + + logger.info(f"id {id} - {unit} - {tool}: configurazione FTP raw estratta") + + except Exception as e: + logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp raw: {e}") + return False + + try: + # 3. Converti in bytes se necessario + if isinstance(csv_raw_data, str): + csv_bytes = csv_raw_data.encode("utf-8") + else: + csv_bytes = csv_raw_data + + # 4. Parse parametri FTP + ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"] or "") + use_tls = "ssl_version" in ftp_parms + passive = ftp_parms.get("passive", True) + port = ftp_parms.get("port", 21) + timeout = ftp_parms.get("timeout", 30.0) + + # 5. Async FTP connection e upload + async with AsyncFTPConnection( + host=send_ftp_info["ftp_addrs"], + port=port, + use_tls=use_tls, + user=send_ftp_info["ftp_user"], + passwd=send_ftp_info["ftp_passwd"], + passive=passive, + timeout=timeout, + ) as ftp: + # Change directory se necessario + if ftp_target and ftp_target != "/": + await ftp.change_directory(ftp_target) + + # Upload raw data + success = await ftp.upload(csv_bytes, ftp_filename) + + if success: + logger.info(f"id {id} - {unit} - {tool}: File raw {ftp_filename} inviato con successo via FTP") + return True + else: + logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP raw") + return False + + except Exception as e: + logger.error(f"id {id} - {unit} - {tool} - Errore FTP raw: {e}", exc_info=True) + return False async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: @@ -485,8 +597,9 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, """ Sends raw data via FTP. - This function attempts to send raw CSV data to the customer via FTP. - It logs success or failure. + This function attempts to send raw CSV data to the customer via FTP + using async operations. It retrieves the raw data from the database + and uploads it to the configured FTP server. Args: cfg (dict): The configuration dictionary. @@ -499,15 +612,16 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, bool: True if the FTP sending was successful, False otherwise. """ try: - # if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): - if True: # Placeholder per test + # Send raw CSV via async FTP + if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): + logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP raw completato con successo") return True else: - logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.") + logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito") return False except Exception as e: - logger.error(f"Errore invio FTP raw data id {id}: {e}") + logger.error(f"Errore invio FTP raw data id {id}: {e}", exc_info=True) return False