import asyncio import logging import os import re from datetime import datetime from utils.csv.parser import extract_value from utils.database.connection import connetti_db_async logger = logging.getLogger(__name__) def on_file_received(self: object, file: str) -> None: """ Wrapper sincrono per on_file_received_async. Questo wrapper permette di mantenere la compatibilità con il server FTP che si aspetta una funzione sincrona, mentre internamente usa asyncio. """ asyncio.run(on_file_received_async(self, file)) async def on_file_received_async(self: object, file: str) -> None: """ Processes a received file, extracts relevant information, and inserts it into the database. If the file is empty, it is removed. Otherwise, it extracts unit and tool information from the filename and the first few lines of the CSV, handles aliases, and then inserts the data into the configured database table. Args: file (str): The path to the received file.""" if not os.stat(file).st_size: os.remove(file) logger.info(f"File {file} is empty: removed.") else: cfg = self.cfg path, filenameExt = os.path.split(file) filename, fileExtension = os.path.splitext(filenameExt) timestamp = datetime.now().strftime("%Y%m%d%H%M%S") new_filename = f"{filename}_{timestamp}{fileExtension}" os.rename(file, f"{path}/{new_filename}") if fileExtension.upper() in (cfg.fileext): with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile: lines = csvfile.readlines() unit_name = extract_value(cfg.units_name, filename, str(lines[0:10])) unit_type = extract_value(cfg.units_type, filename, str(lines[0:10])) tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10])) tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10])) tool_info = "{}" # se esiste l'alias in alias_unit_type, allora prende il valore dell'alias # verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx upper_unit_type = unit_type.upper() unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type upper_tool_type = tool_type.upper() tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type try: # Use async database connection to avoid blocking conn = await connetti_db_async(cfg) except Exception as e: logger.error(f"Database connection error: {e}") return try: # Create a cursor async with conn.cursor() as cur: # da estrarre in un modulo if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK": serial_number = filename.split("_")[0] tool_info = f'{{"serial_number": {serial_number}}}' try: # Use parameterized query to prevent SQL injection await cur.execute( f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = %s", (serial_number,) ) result = await cur.fetchone() if result: unit_name, tool_name = result except Exception as e: logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}") # da estrarre in un modulo if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR": escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()] stazione = extract_value(escaped_keys, filename) if stazione: tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}' # Insert file data into database await cur.execute( f"""INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)""", ( self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), "".join(lines), tool_info, ), ) # Note: autocommit=True in connection, no need for explicit commit logger.info(f"File {new_filename} loaded successfully") # Delete file after successful processing if configured if getattr(cfg, 'delete_after_processing', False): try: os.remove(f"{path}/{new_filename}") logger.info(f"File {new_filename} deleted after successful processing") except Exception as e: logger.warning(f"Failed to delete file {new_filename}: {e}") except Exception as e: logger.error(f"File {new_filename} not loaded. Held in user path.") logger.error(f"{e}") finally: # Always close the connection conn.close() """ else: os.remove(file) logger.info(f'File {new_filename} removed.') """