diff --git a/src/utils/connect/send_data.py b/src/utils/connect/send_data.py index bde03ad..93aa473 100644 --- a/src/utils/connect/send_data.py +++ b/src/utils/connect/send_data.py @@ -106,7 +106,119 @@ class AsyncFTPConnection: async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool: - return True + """ + Sends raw CSV data to a customer via FTP (async implementation). + + Retrieves raw CSV data from the database (received.tool_data column), + then sends it to the customer via FTP using the unit's FTP configuration. + + Args: + cfg (dict): Configuration dictionary. + id (int): The ID of the record being processed (used for logging and DB query). + unit (str): The name of the unit associated with the data. + tool (str): The name of the tool associated with the data. + pool (object): The database connection pool. + + Returns: + bool: True if the CSV data was sent successfully, False otherwise. + """ + # Query per ottenere il CSV raw dal database + raw_data_query = f""" + SELECT tool_data + FROM {cfg.dbname}.{cfg.dbrectable} + WHERE id = %s + """ + + # Query per ottenere le info FTP + ftp_info_query = """ + SELECT ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename_raw, ftp_target_raw, duedate + FROM units + WHERE name = %s + """ + + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + try: + # 1. Recupera il CSV raw dal database + await cur.execute(raw_data_query, (id,)) + raw_data_result = await cur.fetchone() + + if not raw_data_result or not raw_data_result.get("tool_data"): + logger.error(f"id {id} - {unit} - {tool}: nessun dato raw (tool_data) trovato nel database") + return False + + csv_raw_data = raw_data_result["tool_data"] + logger.info(f"id {id} - {unit} - {tool}: estratto CSV raw dal database ({len(csv_raw_data)} bytes)") + + # 2. Recupera configurazione FTP + await cur.execute(ftp_info_query, (unit,)) + send_ftp_info = await cur.fetchone() + + if not send_ftp_info: + logger.error(f"id {id} - {unit} - {tool}: nessuna configurazione FTP trovata per unit") + return False + + # Verifica che ci siano configurazioni per raw data + if not send_ftp_info.get("ftp_filename_raw"): + logger.warning(f"id {id} - {unit} - {tool}: ftp_filename_raw non configurato. Uso ftp_filename standard se disponibile") + # Fallback al filename standard se raw non รจ configurato + if not send_ftp_info.get("ftp_filename"): + logger.error(f"id {id} - {unit} - {tool}: nessun filename FTP configurato") + return False + ftp_filename = send_ftp_info["ftp_filename"] + else: + ftp_filename = send_ftp_info["ftp_filename_raw"] + + # Target directory (con fallback) + ftp_target = send_ftp_info.get("ftp_target_raw") or send_ftp_info.get("ftp_target") or "/" + + logger.info(f"id {id} - {unit} - {tool}: configurazione FTP raw estratta") + + except Exception as e: + logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp raw: {e}") + return False + + try: + # 3. Converti in bytes se necessario + if isinstance(csv_raw_data, str): + csv_bytes = csv_raw_data.encode("utf-8") + else: + csv_bytes = csv_raw_data + + # 4. Parse parametri FTP + ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"] or "") + use_tls = "ssl_version" in ftp_parms + passive = ftp_parms.get("passive", True) + port = ftp_parms.get("port", 21) + timeout = ftp_parms.get("timeout", 30.0) + + # 5. Async FTP connection e upload + async with AsyncFTPConnection( + host=send_ftp_info["ftp_addrs"], + port=port, + use_tls=use_tls, + user=send_ftp_info["ftp_user"], + passwd=send_ftp_info["ftp_passwd"], + passive=passive, + timeout=timeout, + ) as ftp: + # Change directory se necessario + if ftp_target and ftp_target != "/": + await ftp.change_directory(ftp_target) + + # Upload raw data + success = await ftp.upload(csv_bytes, ftp_filename) + + if success: + logger.info(f"id {id} - {unit} - {tool}: File raw {ftp_filename} inviato con successo via FTP") + return True + else: + logger.error(f"id {id} - {unit} - {tool}: Errore durante l'upload FTP raw") + return False + + except Exception as e: + logger.error(f"id {id} - {unit} - {tool} - Errore FTP raw: {e}", exc_info=True) + return False async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: @@ -485,8 +597,9 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, """ Sends raw data via FTP. - This function attempts to send raw CSV data to the customer via FTP. - It logs success or failure. + This function attempts to send raw CSV data to the customer via FTP + using async operations. It retrieves the raw data from the database + and uploads it to the configured FTP server. Args: cfg (dict): The configuration dictionary. @@ -499,15 +612,16 @@ async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, bool: True if the FTP sending was successful, False otherwise. """ try: - # if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): - if True: # Placeholder per test + # Send raw CSV via async FTP + if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): + logger.info(f"id {id} - {unit_name} - {tool_name}: invio FTP raw completato con successo") return True else: - logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.") + logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito") return False except Exception as e: - logger.error(f"Errore invio FTP raw data id {id}: {e}") + logger.error(f"Errore invio FTP raw data id {id}: {e}", exc_info=True) return False