util ftp renamed connect

This commit is contained in:
2025-08-11 22:59:38 +02:00
parent dbe2e7f5a7
commit 2b976d06b3
8 changed files with 45 additions and 17 deletions

View File

View File

@@ -0,0 +1,81 @@
import os
import logging
import re
import mysql.connector
from utils.database.connection import connetti_db
from utils.csv.parser import extract_value
logger = logging.getLogger(__name__)
def on_file_received(self: object, file: str) -> None:
"""Handles the event when a file is successfully received.
Args:
file: The path to the received file.
"""
if not os.stat(file).st_size:
os.remove(file)
logger.info(f'File {file} is empty: removed.')
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
if (fileExtension.upper() in (cfg.fileext)):
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile:
lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
tool_info = "{}"
# se esiste l'alias in alias_unit_type, allora prende il valore dell'alias... verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx
upper_unit_type = unit_type.upper()
unit_type = cfg.units_alias.get(upper_unit_type) or \
cfg.units_alias.get(upper_unit_type[:3]) or \
upper_unit_type
upper_tool_type = tool_type.upper()
tool_type = cfg.tools_alias.get(upper_tool_type) or \
cfg.tools_alias.get(upper_tool_type[:3]) or \
upper_tool_type
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
logger.error(f'{e}')
# Create a cursor
cur = conn.cursor()
# da estrarre in un modulo
if (unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK" ):
serial_number = filename.split('_')[0]
tool_info = f'{{"serial_number": {serial_number}}}'
try:
cur.execute(f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = '{serial_number}'")
unit_name, tool_name = cur.fetchone()
except Exception as e:
logger.warning(f'{tool_type} serial number {serial_number} not found in table vulink_tools. {e}')
# da estrarre in un modulo
if (unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR" ):
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
stazione = extract_value(escaped_keys, filename)
if stazione:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
try:
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s, %s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
conn.commit()
conn.close()
except Exception as e:
logger.error(f'File {file} not loaded. Held in user path.')
logger.error(f'{e}')
else:
os.remove(file)
logger.info(f'File {file} loaded: removed.')

View File

@@ -0,0 +1,367 @@
from ftplib import FTP, FTP_TLS, all_errors
from io import BytesIO
import logging
import aiomysql
from datetime import datetime
from utils.database.loader_action import update_status, unlock
from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp
from utils.database import WorkflowFlags
logger = logging.getLogger(__name__)
class FTPConnection:
"""
Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection.
"""
def __init__(self, host, port=21, use_tls=False, user='', passwd='',
passive=True, timeout=None, debug=0, context=None):
self.use_tls = use_tls
if use_tls:
self.ftp = FTP_TLS(context=context, timeout=timeout) if context else FTP_TLS(timeout=timeout)
else:
self.ftp = FTP(timeout=timeout)
if debug > 0:
self.ftp.set_debuglevel(debug)
self.ftp.connect(host, port)
self.ftp.login(user, passwd)
self.ftp.set_pasv(passive)
if use_tls:
self.ftp.prot_p()
def __getattr__(self, name):
"""Delega tutti i metodi non definiti all'oggetto FTP sottostante"""
return getattr(self.ftp, name)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.ftp.quit()
async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool:
None
return True
async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
"""
Sends elaborated CSV data to a customer via FTP.
Retrieves FTP connection details from the database based on the unit name,
then establishes an FTP connection and uploads the CSV data.
Args:
cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency).
id (int): The ID of the record being processed (used for logging).
unit (str): The name of the unit associated with the data.
tool (str): The name of the tool associated with the data.
csv_data (str): The CSV data as a string to be sent.
pool (object): The database connection pool.
Returns:
bool: True if the CSV data was sent successfully, False otherwise.
"""
query = """
select ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate from units
where name = '%s'";'
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
try:
await cur.execute(query, (unit,))
send_ftp_info = await cur.fetchone()
logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp")
except Exception as e:
logger.error(f"id {id} - {unit} - {tool} - errore nella query per invio ftp: {e}")
try:
# Converti in bytes
csv_bytes = csv_data.encode('utf-8')
csv_buffer = BytesIO(csv_bytes)
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"])
use_tls = 'ssl_version' in ftp_parms
passive = ftp_parms.get('passive', True)
port = ftp_parms.get('port', 21)
# Connessione FTP
with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp:
# Cambia directory
if send_ftp_info["ftp_target"] != "/":
ftp.cwd(send_ftp_info["ftp_target"])
# Invia il file
result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer)
if result.startswith('226'):
logger.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo")
return True
else:
logger.error(f"Errore nell'invio: {result}")
return False
except all_errors as e:
logger.error(f"Errore FTP: {e}")
return False
except Exception as e:
logger.error(f"Errore generico: {e}")
return False
finally:
csv_buffer.close()
async def parse_ftp_parms(ftp_parms: str) -> dict:
"""
Parses a string of FTP parameters into a dictionary.
Args:
ftp_parms (str): A string containing key-value pairs separated by commas,
with keys and values separated by '=>'.
Returns:
dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values.
"""
# Rimuovere spazi e dividere per virgola
pairs = ftp_parms.split(',')
result = {}
for pair in pairs:
if '=>' in pair:
key, value = pair.split('=>', 1)
key = key.strip().lower()
value = value.strip().lower()
# Convertire i valori appropriati
if value.isdigit():
value = int(value)
elif value == '':
value = None
result[key] = value
return result
async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: object):
"""
Elabora un singolo record del workflow in base alla fase specificata.
Args:
record: Tupla contenente i dati del record
fase: Fase corrente del workflow
cfg: Configurazione
pool: Pool di connessioni al database
"""
# Estrazione e normalizzazione dei dati del record
id, unit_type, tool_type, unit_name, tool_name = [
x.lower().replace(" ", "_") if isinstance(x, str) else x
for x in record
]
try:
# Recupero informazioni principali
tool_elab_info = await get_tool_info(fase, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
timestamp_matlab_elab = await get_elab_timestamp(id, pool)
# Verifica se il processing può essere eseguito
if not _should_process(tool_elab_info, timestamp_matlab_elab):
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: "
"invio dati non eseguito - due date raggiunta.")
await update_status(cfg, id, fase, pool)
return
# Routing basato sulla fase
success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
if success:
await update_status(cfg, id, fase, pool)
else:
await update_status(cfg, id, fase, pool)
except Exception as e:
logger.error(f"Errore durante elaborazione id {id} - {unit_name} - {tool_name}: {e}")
raise
finally:
await unlock(cfg, id, pool)
def _should_process_old(tool_elab_info, timestamp_matlab_elab):
"""Verifica se il record può essere processato basandosi sulla due date."""
duedate = tool_elab_info.get("duedate")
if not duedate or duedate in ('0000-00-00 00:00:00', ''):
return True
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now()
return duedate > comparison_timestamp
def _should_process(tool_elab_info, timestamp_matlab_elab):
"""Verifica se il record può essere processato basandosi sulla due date."""
duedate = tool_elab_info.get("duedate")
# Se non c'è duedate o è vuota/nulla, può essere processato
if not duedate or duedate in ('0000-00-00 00:00:00', ''):
return True
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now()
# Converti duedate in datetime se è una stringa
if isinstance(duedate, str):
duedate = datetime.strptime(duedate, '%Y-%m-%d %H:%M:%S')
# Assicurati che comparison_timestamp sia datetime
if isinstance(comparison_timestamp, str):
comparison_timestamp = datetime.strptime(comparison_timestamp, '%Y-%m-%d %H:%M:%S')
return duedate > comparison_timestamp
async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool):
"""
Gestisce il routing delle operazioni in base alla fase del workflow.
Returns:
bool: True se l'operazione è riuscita, False altrimenti
"""
if fase == WorkflowFlags.SENT_ELAB_DATA:
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name,
tool_name, timestamp_matlab_elab, pool)
elif fase == WorkflowFlags.SENT_RAW_DATA:
return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name,
tool_name, pool)
else:
logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.")
return True
async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool):
"""Gestisce la fase di invio dati elaborati."""
# FTP send per dati elaborati
if tool_elab_info.get('ftp_send'):
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
# API send per dati elaborati
elif _should_send_elab_api(tool_elab_info):
return await _send_elab_data_api(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
return True
async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool):
"""Gestisce la fase di invio dati raw."""
# FTP send per dati raw
if tool_elab_info.get('ftp_send_raw'):
return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool)
# API send per dati raw
elif _should_send_raw_api(tool_elab_info):
return await _send_raw_data_api(cfg, id, unit_name, tool_name, pool)
return True
def _should_send_elab_api(tool_elab_info):
"""Verifica se i dati elaborati devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api') and
tool_elab_info.get('api_send') and
tool_elab_info.get('inoltro_api_url', '').strip())
def _should_send_raw_api(tool_elab_info):
"""Verifica se i dati raw devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api_raw') and
tool_elab_info.get('api_send_raw') and
tool_elab_info.get('inoltro_api_url_raw', '').strip())
async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
"""Invia dati elaborati via FTP."""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
if not elab_csv:
return False
print(elab_csv)
# if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP fallito.")
return False
except Exception as e:
logger.error(f"Errore invio FTP elab data id {id}: {e}")
return False
async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
"""Invia dati elaborati via API."""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
if not elab_csv:
return False
print(elab_csv)
# if await send_elab_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API fallito.")
return False
except Exception as e:
logger.error(f"Errore invio API elab data id {id}: {e}")
return False
async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool):
"""Invia dati raw via FTP."""
try:
# if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio FTP raw fallito.")
return False
except Exception as e:
logger.error(f"Errore invio FTP raw data id {id}: {e}")
return False
async def _send_raw_data_api(cfg, id, unit_name, tool_name, pool):
"""Invia dati raw via API."""
try:
# if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool):
if True: # Placeholder per test
return True
else:
logger.error(f"id {id} - {unit_name} - {tool_name}: invio API raw fallito.")
return False
except Exception as e:
logger.error(f"Errore invio API raw data id {id}: {e}")
return False

View File

@@ -0,0 +1,159 @@
import os
import mysql.connector
import logging
from hashlib import sha256
from pathlib import Path
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
def ftp_SITE_ADDU(self: object, line: str) -> None:
"""
Adds a virtual user, creates their directory, and saves their details to the database.
Args:
line (str): A string containing the username and password separated by a space.
"""
cfg = self.cfg
try:
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
password = parms[1] # Get the password
hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password
except IndexError:
self.respond('501 SITE ADDU failed. Command needs 2 arguments')
else:
try:
# Create the user's directory
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.respond(f'551 Error in create virtual user path: {e}')
else:
try:
# Add the user to the authorizer
self.authorizer.add_user(str(user),
hash, cfg.virtpath + "/" + user, perm=cfg.defperm)
# Save the user to the database
# Define the database connection
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
# Create a cursor
cur = conn.cursor()
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')")
conn.commit()
conn.close()
logger.info(f"User {user} created.")
self.respond('200 SITE ADDU successful.')
except Exception as e:
self.respond(f'501 SITE ADDU failed: {e}.')
print(e)
def ftp_SITE_DISU(self: object, line: str) -> None:
"""
Removes a virtual user from the authorizer and marks them as deleted in the database.
Args:
line (str): A string containing the username to be disabled.
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
try:
# Remove the user from the authorizer
self.authorizer.remove_user(str(user))
# Delete the user from database
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
# Crea un cursore
cur = conn.cursor()
cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = now() WHERE ftpuser = '{user}'")
conn.commit()
conn.close()
logger.info(f"User {user} deleted.")
self.respond('200 SITE DISU successful.')
except Exception as e:
self.respond('501 SITE DISU failed.')
print(e)
def ftp_SITE_ENAU(self: object, line: str) -> None:
"""
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
Args:
line (str): A string containing the username to be enabled.
"""
cfg = self.cfg
parms = line.split()
user = os.path.basename(parms[0]) # Extract the username
try:
# Restore the user into database
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
# Crea un cursore
cur = conn.cursor()
try:
cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = null WHERE ftpuser = '{user}'")
conn.commit()
except Exception as e:
logger.error(f"Update DB failed: {e}")
cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE ftpuser = '{user}'")
ftpuser, hash, virtpath, perm = cur.fetchone()
self.authorizer.add_user(ftpuser, hash, virtpath, perm)
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.responde(f'551 Error in create virtual user path: {e}')
conn.close()
logger.info(f"User {user} restored.")
self.respond('200 SITE ENAU successful.')
except Exception as e:
self.respond('501 SITE ENAU failed.')
print(e)
def ftp_SITE_LSTU(self: object, line: str) -> None:
"""
Lists all virtual users from the database.
Args:
line (str): An empty string (no arguments needed for this command).
"""
cfg = self.cfg
users_list = []
try:
# Connect to the SQLite database to fetch users
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
# Crea un cursore
cur = conn.cursor()
self.push("214-The following virtual users are defined:\r\n")
cur.execute(f'SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}')
[users_list.append(f'Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n') for ftpuser, perm, disabled_at in cur.fetchall()]
self.push(''.join(users_list))
self.respond("214 LSTU SITE command successful.")
except Exception as e:
self.respond(f'501 list users failed: {e}')