Files
ASE/src/utils/connect/file_management.py
2025-11-03 19:06:04 +01:00

132 lines
6.2 KiB
Python

import asyncio
import logging
import os
import re
from datetime import datetime
from utils.csv.parser import extract_value
from utils.database.connection import connetti_db_async
logger = logging.getLogger(__name__)
def on_file_received(self: object, file: str) -> None:
"""
Wrapper sincrono per on_file_received_async.
Questo wrapper permette di mantenere la compatibilità con il server FTP
che si aspetta una funzione sincrona, mentre internamente usa asyncio.
"""
asyncio.run(on_file_received_async(self, file))
async def on_file_received_async(self: object, file: str) -> None:
"""
Processes a received file, extracts relevant information, and inserts it into the database.
If the file is empty, it is removed. Otherwise, it extracts unit and tool
information from the filename and the first few lines of the CSV, handles
aliases, and then inserts the data into the configured database table.
Args:
file (str): The path to the received file."""
if not os.stat(file).st_size:
os.remove(file)
logger.info(f"File {file} is empty: removed.")
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}")
if fileExtension.upper() in (cfg.fileext):
with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile:
lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
tool_info = "{}"
# se esiste l'alias in alias_unit_type, allora prende il valore dell'alias
# verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx
upper_unit_type = unit_type.upper()
unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type
upper_tool_type = tool_type.upper()
tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type
try:
# Use async database connection to avoid blocking
conn = await connetti_db_async(cfg)
except Exception as e:
logger.error(f"Database connection error: {e}")
return
try:
# Create a cursor
async with conn.cursor() as cur:
# da estrarre in un modulo
if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK":
serial_number = filename.split("_")[0]
tool_info = f'{{"serial_number": {serial_number}}}'
try:
# Use parameterized query to prevent SQL injection
await cur.execute(
f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = %s", (serial_number,)
)
result = await cur.fetchone()
if result:
unit_name, tool_name = result
except Exception as e:
logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}")
# da estrarre in un modulo
if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR":
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
stazione = extract_value(escaped_keys, filename)
if stazione:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
# Insert file data into database
await cur.execute(
f"""INSERT INTO {cfg.dbname}.{cfg.dbrectable}
(username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info)
VALUES (%s,%s, %s, %s, %s, %s, %s, %s)""",
(
self.username,
new_filename,
unit_name.upper(),
unit_type.upper(),
tool_name.upper(),
tool_type.upper(),
"".join(lines),
tool_info,
),
)
# Note: autocommit=True in connection, no need for explicit commit
logger.info(f"File {new_filename} loaded successfully")
# Delete file after successful processing if configured
if getattr(cfg, 'delete_after_processing', False):
try:
os.remove(f"{path}/{new_filename}")
logger.info(f"File {new_filename} deleted after successful processing")
except Exception as e:
logger.warning(f"Failed to delete file {new_filename}: {e}")
except Exception as e:
logger.error(f"File {new_filename} not loaded. Held in user path.")
logger.error(f"{e}")
finally:
# Always close the connection
conn.close()
"""
else:
os.remove(file)
logger.info(f'File {new_filename} removed.')
"""