132 lines
6.2 KiB
Python
132 lines
6.2 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from utils.csv.parser import extract_value
|
|
from utils.database.connection import connetti_db_async
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def on_file_received(self: object, file: str) -> None:
|
|
"""
|
|
Wrapper sincrono per on_file_received_async.
|
|
|
|
Questo wrapper permette di mantenere la compatibilità con il server FTP
|
|
che si aspetta una funzione sincrona, mentre internamente usa asyncio.
|
|
"""
|
|
asyncio.run(on_file_received_async(self, file))
|
|
|
|
|
|
async def on_file_received_async(self: object, file: str) -> None:
|
|
"""
|
|
Processes a received file, extracts relevant information, and inserts it into the database.
|
|
|
|
If the file is empty, it is removed. Otherwise, it extracts unit and tool
|
|
information from the filename and the first few lines of the CSV, handles
|
|
aliases, and then inserts the data into the configured database table.
|
|
|
|
Args:
|
|
file (str): The path to the received file."""
|
|
|
|
if not os.stat(file).st_size:
|
|
os.remove(file)
|
|
logger.info(f"File {file} is empty: removed.")
|
|
else:
|
|
cfg = self.cfg
|
|
path, filenameExt = os.path.split(file)
|
|
filename, fileExtension = os.path.splitext(filenameExt)
|
|
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
new_filename = f"{filename}_{timestamp}{fileExtension}"
|
|
os.rename(file, f"{path}/{new_filename}")
|
|
if fileExtension.upper() in (cfg.fileext):
|
|
with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile:
|
|
lines = csvfile.readlines()
|
|
|
|
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
|
|
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
|
|
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
|
|
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
|
|
tool_info = "{}"
|
|
|
|
# se esiste l'alias in alias_unit_type, allora prende il valore dell'alias
|
|
# verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx
|
|
upper_unit_type = unit_type.upper()
|
|
unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type
|
|
upper_tool_type = tool_type.upper()
|
|
tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type
|
|
|
|
try:
|
|
# Use async database connection to avoid blocking
|
|
conn = await connetti_db_async(cfg)
|
|
except Exception as e:
|
|
logger.error(f"Database connection error: {e}")
|
|
return
|
|
|
|
try:
|
|
# Create a cursor
|
|
async with conn.cursor() as cur:
|
|
# da estrarre in un modulo
|
|
if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK":
|
|
serial_number = filename.split("_")[0]
|
|
tool_info = f'{{"serial_number": {serial_number}}}'
|
|
try:
|
|
# Use parameterized query to prevent SQL injection
|
|
await cur.execute(
|
|
f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = %s", (serial_number,)
|
|
)
|
|
result = await cur.fetchone()
|
|
if result:
|
|
unit_name, tool_name = result
|
|
except Exception as e:
|
|
logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}")
|
|
|
|
# da estrarre in un modulo
|
|
if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR":
|
|
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
|
|
stazione = extract_value(escaped_keys, filename)
|
|
if stazione:
|
|
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
|
|
|
|
# Insert file data into database
|
|
await cur.execute(
|
|
f"""INSERT INTO {cfg.dbname}.{cfg.dbrectable}
|
|
(username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info)
|
|
VALUES (%s,%s, %s, %s, %s, %s, %s, %s)""",
|
|
(
|
|
self.username,
|
|
new_filename,
|
|
unit_name.upper(),
|
|
unit_type.upper(),
|
|
tool_name.upper(),
|
|
tool_type.upper(),
|
|
"".join(lines),
|
|
tool_info,
|
|
),
|
|
)
|
|
# Note: autocommit=True in connection, no need for explicit commit
|
|
logger.info(f"File {new_filename} loaded successfully")
|
|
|
|
# Delete file after successful processing if configured
|
|
if getattr(cfg, 'delete_after_processing', False):
|
|
try:
|
|
os.remove(f"{path}/{new_filename}")
|
|
logger.info(f"File {new_filename} deleted after successful processing")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete file {new_filename}: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"File {new_filename} not loaded. Held in user path.")
|
|
logger.error(f"{e}")
|
|
|
|
finally:
|
|
# Always close the connection
|
|
conn.close()
|
|
"""
|
|
else:
|
|
os.remove(file)
|
|
logger.info(f'File {new_filename} removed.')
|
|
"""
|