From fb2b2724edaa8bc1029bc101ae51a52aafa56105 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 22 Sep 2025 22:30:54 +0200 Subject: [PATCH] lint con ruff --- .vscode/{setting.json => settings_ko.json} | 2 + pyproject.toml | 24 +++ src/elab_orchestrator.py | 42 ++-- src/ftp_csv_receiver.py | 80 ++++---- src/load_ftp_users.py | 47 ++--- src/load_orchestrator.py | 14 +- src/send_orchestrator.py | 8 +- src/utils/config/loader_email.py | 6 +- src/utils/config/loader_ftp_csv.py | 34 ++-- src/utils/config/loader_load_data.py | 5 +- src/utils/config/loader_matlab_elab.py | 9 +- src/utils/config/loader_send_data.py | 5 +- src/utils/config/users_loader.py | 7 +- src/utils/connect/file_management.py | 53 +++-- src/utils/connect/send_data.py | 117 +++++------ src/utils/connect/send_email.py | 31 ++- src/utils/connect/user_admin.py | 60 +++--- src/utils/csv/data_preparation.py | 192 ++++++++++++------ src/utils/csv/loaders.py | 52 +++-- src/utils/csv/parser.py | 6 +- src/utils/database/__init__.py | 21 +- src/utils/database/action_query.py | 27 +-- src/utils/database/connection.py | 10 +- src/utils/database/loader_action.py | 37 ++-- src/utils/database/nodes_query.py | 13 +- src/utils/general.py | 12 +- src/utils/orchestrator_utils.py | 22 +- src/utils/parsers/by_type/cr1000x_cr1000x.py | 3 +- src/utils/parsers/by_type/d2w_d2w.py | 3 +- src/utils/parsers/by_type/g201_g201.py | 3 +- src/utils/parsers/by_type/g301_g301.py | 3 +- src/utils/parsers/by_type/g801_iptm.py | 3 +- src/utils/parsers/by_type/g801_loc.py | 1 + src/utils/parsers/by_type/g801_mums.py | 1 + src/utils/parsers/by_type/g801_musa.py | 3 +- src/utils/parsers/by_type/g801_mux.py | 3 +- src/utils/parsers/by_type/g802_dsas.py | 3 +- src/utils/parsers/by_type/g802_gd.py | 3 +- src/utils/parsers/by_type/g802_loc.py | 3 +- src/utils/parsers/by_type/g802_modb.py | 3 +- src/utils/parsers/by_type/g802_mums.py | 3 +- src/utils/parsers/by_type/g802_mux.py | 3 +- src/utils/parsers/by_type/gs1_gs1.py | 3 +- .../parsers/by_type/hirpinia_hirpinia.py | 2 +- src/utils/parsers/by_type/hortus_hortus.py | 4 +- .../parsers/by_type/isi_csv_log_vulink.py | 2 +- src/utils/parsers/by_type/sisgeo_health.py | 2 +- src/utils/parsers/by_type/sisgeo_readings.py | 2 +- src/utils/parsers/by_type/sorotecpini_co.py | 2 +- .../stazionetotale_integrity_monitor.py | 2 +- .../by_type/stazionetotale_messpunktepini.py | 2 +- src/utils/parsers/by_type/tlp_loc.py | 3 +- src/utils/parsers/by_type/tlp_tlp.py | 3 +- src/utils/timestamp/date_check.py | 13 +- 54 files changed, 585 insertions(+), 432 deletions(-) rename .vscode/{setting.json => settings_ko.json} (51%) diff --git a/.vscode/setting.json b/.vscode/settings_ko.json similarity index 51% rename from .vscode/setting.json rename to .vscode/settings_ko.json index 0bff96c..97203ab 100644 --- a/.vscode/setting.json +++ b/.vscode/settings_ko.json @@ -1,4 +1,6 @@ { + "python.analysis.autoImportCompletions": true, + "python.analysis.typeCheckingMode": "standard", "flake8.args": ["--max-line-length=140"], "python.linting.flake8Args": ["--config","flake8.cfg"] } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f6b41f1..47f0729 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,3 +29,27 @@ package-dir = {"" = "src"} [tool.setuptools.packages.find] exclude = ["test","build"] where = ["src"] + +[tool.ruff] +# Lunghezza massima della riga +line-length = 160 + +[tool.ruff.lint] +# Regole di linting da abilitare +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade +] + +# Regole da ignorare +ignore = [] + +[tool.ruff.format] +# Usa virgole finali +quote-style = "double" +indent-style = "space" \ No newline at end of file diff --git a/src/elab_orchestrator.py b/src/elab_orchestrator.py index b66b0d4..1fb810e 100755 --- a/src/elab_orchestrator.py +++ b/src/elab_orchestrator.py @@ -4,18 +4,18 @@ Orchestratore dei worker che lanciano le elaborazioni """ # Import necessary libraries -import logging import asyncio +import logging # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting -from utils.database import WorkflowFlags -from utils.database.action_query import get_tool_info, check_flag_elab -from utils.csv.loaders import get_next_csv_atomic -from utils.orchestrator_utils import run_orchestrator, worker_context -from utils.database.loader_action import update_status, unlock from utils.connect.send_email import send_error_email +from utils.csv.loaders import get_next_csv_atomic +from utils.database import WorkflowFlags +from utils.database.action_query import check_flag_elab, get_tool_info +from utils.database.loader_action import unlock, update_status from utils.general import read_error_lines_from_logs +from utils.orchestrator_utils import run_orchestrator, worker_context # Initialize the logger for this module logger = logging.getLogger() @@ -51,18 +51,16 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) if record: rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? + if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool) if tool_elab_info: - if tool_elab_info['statustools'].lower() in cfg.elab_status: + if tool_elab_info["statustools"].lower() in cfg.elab_status: logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name) await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool) - matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" + matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh \ + {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" proc = await asyncio.create_subprocess_shell( - matlab_cmd, - cwd=cfg.matlab_func_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + matlab_cmd, cwd=cfg.matlab_func_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() @@ -82,9 +80,12 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: # errors = [line for line in stderr.decode().strip() if line.startswith("Error")] # warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")] - errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt") - await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings) - + errors, warnings = await read_error_lines_from_logs( + cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt" + ) + await send_error_email( + unit_name.upper(), tool_name.upper(), tool_elab_info["matcall"], error_type, errors, warnings + ) else: logger.info(stdout.decode().strip()) @@ -92,7 +93,9 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: await unlock(cfg, rec_id, pool) await asyncio.sleep(ELAB_PROCESSING_DELAY) else: - logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools']) + logger.info( + "ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info["statustools"] + ) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) await unlock(cfg, rec_id, pool) @@ -108,7 +111,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: logger.info("Flag fermo elaborazione attivato") await asyncio.sleep(NO_RECORD_SLEEP) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode) await asyncio.sleep(1) @@ -117,5 +120,6 @@ async def main(): """Funzione principale che avvia l'elab_orchestrator.""" await run_orchestrator(setting.Config, worker) + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/src/ftp_csv_receiver.py b/src/ftp_csv_receiver.py index 98ccf1d..5103080 100755 --- a/src/ftp_csv_receiver.py +++ b/src/ftp_csv_receiver.py @@ -1,22 +1,21 @@ #!.venv/bin/python """ - This module implements an FTP server with custom commands for - managing virtual users and handling CSV file uploads. +This module implements an FTP server with custom commands for +managing virtual users and handling CSV file uploads. """ -import os import logging - +import os from hashlib import sha256 from pathlib import Path +from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer -from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed from utils.config import loader_ftp_csv as setting +from utils.connect import file_management, user_admin from utils.database.connection import connetti_db -from utils.connect import user_admin, file_management # Configure logging (moved inside main function) @@ -33,46 +32,37 @@ class DummySha256Authorizer(DummyAuthorizer): cfg: The configuration object. """ super().__init__() - self.add_user( - cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3] - ) + self.add_user(cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]) # Define the database connection conn = connetti_db(cfg) # Create a cursor cur = conn.cursor() - cur.execute( - f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL" - ) + cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL") for ftpuser, user_hash, virtpath, perm in cur.fetchall(): # Create the user's directory if it does not exist. try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) self.add_user(ftpuser, user_hash, virtpath, perm) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except self.responde(f"551 Error in create virtual user path: {e}") - - def validate_authentication( - self: object, username: str, password: str, handler: object - ) -> None: + def validate_authentication(self: object, username: str, password: str, handler: object) -> None: # Validate the user's password against the stored user_hash user_hash = sha256(password.encode("UTF-8")).hexdigest() try: if self.user_table[username]["pwd"] != user_hash: raise KeyError except KeyError: - raise AuthenticationFailed + raise AuthenticationFailed # noqa: B904 class ASEHandler(FTPHandler): """Custom FTP handler that extends FTPHandler with custom commands and file handling.""" - def __init__( - self: object, conn: object, server: object, ioloop: object = None - ) -> None: + def __init__(self: object, conn: object, server: object, ioloop: object = None) -> None: """Initializes the handler, adds custom commands, and sets up command permissions. Args: @@ -85,42 +75,42 @@ class ASEHandler(FTPHandler): # Add custom FTP commands for managing virtual users - command in lowercase self.proto_cmds.update( { - "SITE ADDU": dict( - perm="M", - auth=True, - arg=True, - help="Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).", - ) + "SITE ADDU": { + "perm": "M", + "auth": True, + "arg": True, + "help": "Syntax: SITE ADDU USERNAME PASSWORD (add virtual user).", + } } ) self.proto_cmds.update( { - "SITE DISU": dict( - perm="M", - auth=True, - arg=True, - help="Syntax: SITE DISU USERNAME (disable virtual user).", - ) + "SITE DISU": { + "perm": "M", + "auth": True, + "arg": True, + "help": "Syntax: SITE DISU USERNAME (disable virtual user).", + } } ) self.proto_cmds.update( { - "SITE ENAU": dict( - perm="M", - auth=True, - arg=True, - help="Syntax: SITE ENAU USERNAME (enable virtual user).", - ) + "SITE ENAU": { + "perm": "M", + "auth": True, + "arg": True, + "help": "Syntax: SITE ENAU USERNAME (enable virtual user).", + } } ) self.proto_cmds.update( { - "SITE LSTU": dict( - perm="M", - auth=True, - arg=None, - help="Syntax: SITE LSTU (list virtual users).", - ) + "SITE LSTU": { + "perm": "M", + "auth": True, + "arg": None, + "help": "Syntax: SITE LSTU (list virtual users).", + } } ) diff --git a/src/load_ftp_users.py b/src/load_ftp_users.py index 831e023..ae06a02 100644 --- a/src/load_ftp_users.py +++ b/src/load_ftp_users.py @@ -3,29 +3,22 @@ Script per prelevare dati da MySQL e inviare comandi SITE FTP """ -from ftplib import FTP import logging import sys -from typing import List, Tuple -import mysql.connector -from utils.database.connection import connetti_db -from utils.config import users_loader as setting +from ftplib import FTP +import mysql.connector + +from utils.config import users_loader as setting +from utils.database.connection import connetti_db # Configurazione logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' -) +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Configurazione server FTP -FTP_CONFIG = { - 'host': 'localhost', - 'user': 'admin', - 'password': 'batt1l0', - 'port': 2121 -} +FTP_CONFIG = {"host": "localhost", "user": "admin", "password": "batt1l0", "port": 2121} + def connect_ftp() -> FTP: """ @@ -35,15 +28,16 @@ def connect_ftp() -> FTP: """ try: ftp = FTP() - ftp.connect(FTP_CONFIG['host'], FTP_CONFIG['port']) - ftp.login(FTP_CONFIG['user'], FTP_CONFIG['password']) + ftp.connect(FTP_CONFIG["host"], FTP_CONFIG["port"]) + ftp.login(FTP_CONFIG["user"], FTP_CONFIG["password"]) logger.info("Connessione FTP stabilita") return ftp - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore connessione FTP: %s", e) sys.exit(1) -def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tuple]: + +def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> list[tuple]: """ Fetches username and password data from the 'ftp_accounts' table in the database. @@ -73,6 +67,7 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tupl finally: cursor.close() + def send_site_command(ftp: FTP, command: str) -> bool: """ Sends a SITE command to the FTP server. @@ -88,10 +83,11 @@ def send_site_command(ftp: FTP, command: str) -> bool: response = ftp.sendcmd(f"SITE {command}") logger.info("Comando SITE %s inviato. Risposta: %s", command, response) return True - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore invio comando SITE %s: %s", command, e) return False + def main(): """ Main function to connect to the database, fetch FTP user data, and send SITE ADDU commands to the FTP server. @@ -119,7 +115,7 @@ def main(): username, password = row # Costruisci il comando SITE completo - ftp_site_command = f'addu {username} {password}' + ftp_site_command = f"addu {username} {password}" logger.info("Sending ftp command: %s", ftp_site_command) @@ -131,7 +127,7 @@ def main(): logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore generale: %s", e) finally: @@ -139,14 +135,15 @@ def main(): try: ftp_connection.quit() logger.info("Connessione FTP chiusa") - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore chiusura connessione FTP: %s", e) try: db_connection.close() logger.info("Connessione MySQL chiusa") - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore chiusura connessione MySQL: %s", e) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/load_orchestrator.py b/src/load_orchestrator.py index 6e34f9a..beb463b 100755 --- a/src/load_orchestrator.py +++ b/src/load_orchestrator.py @@ -4,14 +4,14 @@ Orchestratore dei worker che caricano i dati su dataraw """ # Import necessary libraries -import logging -import importlib import asyncio +import importlib +import logging # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting -from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic +from utils.database import WorkflowFlags from utils.orchestrator_utils import run_orchestrator, worker_context # Initialize the logger for this module @@ -58,7 +58,7 @@ async def worker(worker_id: int, cfg: dict, pool: object) -> None: logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) - except Exception as e: # pylint: disable=broad-except + except Exception as e: # pylint: disable=broad-except logger.error("Errore durante l'esecuzione: %s", e, exc_info=1) await asyncio.sleep(1) @@ -79,9 +79,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool: debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.debug("Inizio ricerca nuovo CSV da elaborare") - rec_id, unit_type, tool_type, unit_name, tool_name = [ - x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record - ] + rec_id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] logger.info( "Trovato CSV da elaborare: ID=%s, Tipo=%s_%s, Nome=%s_%s", rec_id, @@ -118,7 +116,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool: return False # Ottiene la funzione 'main_loader' dal modulo - funzione = getattr(modulo, "main_loader") + funzione = modulo.main_loader # Esegui la funzione logger.info("Elaborazione con modulo %s per ID=%s", modulo, rec_id) diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index 2d86739..42cc311 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -4,16 +4,16 @@ Orchestratore dei worker che inviano i dati ai clienti """ # Import necessary libraries -import logging import asyncio +import logging # Import custom modules for configuration and database connection from utils.config import loader_send_data as setting -from utils.database import WorkflowFlags -from utils.csv.loaders import get_next_csv_atomic -from utils.orchestrator_utils import run_orchestrator, worker_context from utils.connect.send_data import process_workflow_record +from utils.csv.loaders import get_next_csv_atomic +from utils.database import WorkflowFlags from utils.general import alterna_valori +from utils.orchestrator_utils import run_orchestrator, worker_context # from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, \ # ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer diff --git a/src/utils/config/loader_email.py b/src/utils/config/loader_email.py index 0d6e027..daf64da 100644 --- a/src/utils/config/loader_email.py +++ b/src/utils/config/loader_email.py @@ -1,9 +1,10 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: def __init__(self): c = ConfigParser() @@ -22,4 +23,3 @@ class Config: self.smtp_port = c.getint("smtp", "port") self.smtp_user = c.get("smtp", "user") self.smtp_passwd = c.get("smtp", "password") - diff --git a/src/utils/config/loader_ftp_csv.py b/src/utils/config/loader_ftp_csv.py index 08fa9ba..d81fd93 100644 --- a/src/utils/config/loader_ftp_csv.py +++ b/src/utils/config/loader_ftp_csv.py @@ -1,9 +1,10 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: def __init__(self): """ @@ -40,7 +41,6 @@ class Config: self.dbname = c.get("db", "dbName") self.max_retries = c.getint("db", "maxRetries") - # Tables self.dbusertable = c.get("tables", "userTableName") self.dbrectable = c.get("tables", "recTableName") @@ -49,30 +49,24 @@ class Config: self.dbnodes = c.get("tables", "nodesTableName") # unit setting - self.units_name = [part for part in c.get("unit", "Names").split('|')] - self.units_type = [part for part in c.get("unit", "Types").split('|')] - self.units_alias = { - key: value - for item in c.get("unit", "Alias").split('|') - for key, value in [item.split(':', 1)] - } - #self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]} + self.units_name = [part for part in c.get("unit", "Names").split("|")] + self.units_type = [part for part in c.get("unit", "Types").split("|")] + self.units_alias = {key: value for item in c.get("unit", "Alias").split("|") for key, value in [item.split(":", 1)]} + # self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]} # tool setting - self.tools_name = [part for part in c.get("tool", "Names").split('|')] - self.tools_type = [part for part in c.get("tool", "Types").split('|')] + self.tools_name = [part for part in c.get("tool", "Names").split("|")] + self.tools_type = [part for part in c.get("tool", "Types").split("|")] self.tools_alias = { - key: key if value == '=' else value - for item in c.get("tool", "Alias").split('|') - for key, value in [item.split(':', 1)] + key: key if value == "=" else value for item in c.get("tool", "Alias").split("|") for key, value in [item.split(":", 1)] } # csv info - self.csv_infos = [part for part in c.get("csv", "Infos").split('|')] + self.csv_infos = [part for part in c.get("csv", "Infos").split("|")] # TS pini path match self.ts_pini_path_match = { - key: key[1:-1] if value == '=' else value - for item in c.get("ts_pini", "path_match").split('|') - for key, value in [item.split(':', 1)] + key: key[1:-1] if value == "=" else value + for item in c.get("ts_pini", "path_match").split("|") + for key, value in [item.split(":", 1)] } diff --git a/src/utils/config/loader_load_data.py b/src/utils/config/loader_load_data.py index acd74a2..4bcc0b9 100644 --- a/src/utils/config/loader_load_data.py +++ b/src/utils/config/loader_load_data.py @@ -1,9 +1,10 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: def __init__(self): """ diff --git a/src/utils/config/loader_matlab_elab.py b/src/utils/config/loader_matlab_elab.py index 0e4be45..39ad905 100644 --- a/src/utils/config/loader_matlab_elab.py +++ b/src/utils/config/loader_matlab_elab.py @@ -1,9 +1,10 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: def __init__(self): """ @@ -36,11 +37,11 @@ class Config: self.dbnodes = c.get("tables", "nodesTableName") # Tool - self.elab_status = [part for part in c.get("tool", "elab_status").split('|')] + self.elab_status = [part for part in c.get("tool", "elab_status").split("|")] # Matlab self.matlab_runtime = c.get("matlab", "runtime") self.matlab_func_path = c.get("matlab", "func_path") self.matlab_timeout = c.getint("matlab", "timeout") self.matlab_error = c.get("matlab", "error") - self.matlab_error_path = c.get("matlab", "error_path") \ No newline at end of file + self.matlab_error_path = c.get("matlab", "error_path") diff --git a/src/utils/config/loader_send_data.py b/src/utils/config/loader_send_data.py index 8343d8b..7271112 100644 --- a/src/utils/config/loader_send_data.py +++ b/src/utils/config/loader_send_data.py @@ -1,9 +1,10 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: def __init__(self): """ diff --git a/src/utils/config/users_loader.py b/src/utils/config/users_loader.py index 3fe6d50..1cec36a 100644 --- a/src/utils/config/users_loader.py +++ b/src/utils/config/users_loader.py @@ -1,15 +1,16 @@ -"""set configurations +"""set configurations""" -""" from configparser import ConfigParser + from . import ENV_PARENT_PATH + class Config: """ Handles configuration loading for database settings to load ftp users. """ - def __init__(self): + def __init__(self): c = ConfigParser() c.read([f"{ENV_PARENT_PATH}/env/db.ini"]) diff --git a/src/utils/connect/file_management.py b/src/utils/connect/file_management.py index a211d96..40d71ce 100644 --- a/src/utils/connect/file_management.py +++ b/src/utils/connect/file_management.py @@ -1,14 +1,16 @@ -import os -from datetime import datetime import logging +import os import re +from datetime import datetime + import mysql.connector -from utils.database.connection import connetti_db from utils.csv.parser import extract_value +from utils.database.connection import connetti_db logger = logging.getLogger(__name__) + def on_file_received(self: object, file: str) -> None: """ Processes a received file, extracts relevant information, and inserts it into the database. @@ -22,7 +24,7 @@ def on_file_received(self: object, file: str) -> None: if not os.stat(file).st_size: os.remove(file) - logger.info(f'File {file} is empty: removed.') + logger.info(f"File {file} is empty: removed.") else: cfg = self.cfg path, filenameExt = os.path.split(file) @@ -30,8 +32,8 @@ def on_file_received(self: object, file: str) -> None: timestamp = datetime.now().strftime("%Y%m%d%H%M%S") new_filename = f"{filename}_{timestamp}{fileExtension}" os.rename(file, f"{path}/{new_filename}") - if (fileExtension.upper() in (cfg.fileext)): - with open(f"{path}/{new_filename}", 'r', encoding='utf-8', errors='ignore') as csvfile: + if fileExtension.upper() in (cfg.fileext): + with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile: lines = csvfile.readlines() unit_name = extract_value(cfg.units_name, filename, str(lines[0:10])) @@ -42,50 +44,57 @@ def on_file_received(self: object, file: str) -> None: # se esiste l'alias in alias_unit_type, allora prende il valore dell'alias... verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx upper_unit_type = unit_type.upper() - unit_type = cfg.units_alias.get(upper_unit_type) or \ - cfg.units_alias.get(upper_unit_type[:3]) or \ - upper_unit_type + unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type upper_tool_type = tool_type.upper() - tool_type = cfg.tools_alias.get(upper_tool_type) or \ - cfg.tools_alias.get(upper_tool_type[:3]) or \ - upper_tool_type + tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type try: conn = connetti_db(cfg) except mysql.connector.Error as e: - logger.error(f'{e}') + logger.error(f"{e}") # Create a cursor cur = conn.cursor() - # da estrarre in un modulo - if (unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK" ): - serial_number = filename.split('_')[0] + if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK": + serial_number = filename.split("_")[0] tool_info = f'{{"serial_number": {serial_number}}}' try: cur.execute(f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = '{serial_number}'") unit_name, tool_name = cur.fetchone() except Exception as e: - logger.warning(f'{tool_type} serial number {serial_number} not found in table vulink_tools. {e}') + logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}") # da estrarre in un modulo - if (unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR" ): + if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR": escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()] stazione = extract_value(escaped_keys, filename) if stazione: tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}' try: - cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info)) + cur.execute( + f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", + ( + self.username, + new_filename, + unit_name.upper(), + unit_type.upper(), + tool_name.upper(), + tool_type.upper(), + "".join(lines), + tool_info, + ), + ) conn.commit() conn.close() except Exception as e: - logger.error(f'File {new_filename} not loaded. Held in user path.') - logger.error(f'{e}') + logger.error(f"File {new_filename} not loaded. Held in user path.") + logger.error(f"{e}") """ else: os.remove(file) logger.info(f'File {new_filename} removed.') - """ \ No newline at end of file + """ diff --git a/src/utils/connect/send_data.py b/src/utils/connect/send_data.py index a9856e4..8595c80 100644 --- a/src/utils/connect/send_data.py +++ b/src/utils/connect/send_data.py @@ -1,22 +1,23 @@ +import logging +from datetime import datetime from ftplib import FTP, FTP_TLS, all_errors from io import BytesIO -import logging -import aiomysql -from datetime import datetime -from utils.database.loader_action import update_status, unlock -from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp +import aiomysql + from utils.database import WorkflowFlags +from utils.database.action_query import get_data_as_csv, get_elab_timestamp, get_tool_info +from utils.database.loader_action import unlock, update_status logger = logging.getLogger(__name__) + class FTPConnection: """ Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection. """ - def __init__(self, host, port=21, use_tls=False, user='', passwd='', - passive=True, timeout=None, debug=0, context=None): + def __init__(self, host, port=21, use_tls=False, user="", passwd="", passive=True, timeout=None, debug=0, context=None): self.use_tls = use_tls if use_tls: @@ -44,10 +45,12 @@ class FTPConnection: def __exit__(self, exc_type, exc_val, exc_tb): self.ftp.quit() + async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool: None return True + async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: """ Sends elaborated CSV data to a customer via FTP. @@ -81,26 +84,32 @@ async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str try: # Converti in bytes - csv_bytes = csv_data.encode('utf-8') + csv_bytes = csv_data.encode("utf-8") csv_buffer = BytesIO(csv_bytes) ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"]) - use_tls = 'ssl_version' in ftp_parms - passive = ftp_parms.get('passive', True) - port = ftp_parms.get('port', 21) + use_tls = "ssl_version" in ftp_parms + passive = ftp_parms.get("passive", True) + port = ftp_parms.get("port", 21) # Connessione FTP - with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp: - + with FTPConnection( + host=send_ftp_info["ftp_addrs"], + port=port, + use_tls=use_tls, + user=send_ftp_info["ftp_user"], + passwd=send_ftp_info["ftp_passwd"], + passive=passive, + ) as ftp: # Cambia directory if send_ftp_info["ftp_target"] != "/": ftp.cwd(send_ftp_info["ftp_target"]) # Invia il file - result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer) + result = ftp.storbinary(f"STOR {send_ftp_info['ftp_filename']}", csv_buffer) - if result.startswith('226'): - logger.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo") + if result.startswith("226"): + logger.info(f"File {send_ftp_info['ftp_filename']} inviato con successo") return True else: logger.error(f"Errore nell'invio: {result}") @@ -115,6 +124,7 @@ async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str finally: csv_buffer.close() + async def parse_ftp_parms(ftp_parms: str) -> dict: """ Parses a string of FTP parameters into a dictionary. @@ -127,19 +137,19 @@ async def parse_ftp_parms(ftp_parms: str) -> dict: dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values. """ # Rimuovere spazi e dividere per virgola - pairs = ftp_parms.split(',') + pairs = ftp_parms.split(",") result = {} for pair in pairs: - if '=>' in pair: - key, value = pair.split('=>', 1) + if "=>" in pair: + key, value = pair.split("=>", 1) key = key.strip().lower() value = value.strip().lower() # Convertire i valori appropriati if value.isdigit(): value = int(value) - elif value == '': + elif value == "": value = None result[key] = value @@ -158,10 +168,7 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj pool: Pool di connessioni al database """ # Estrazione e normalizzazione dei dati del record - id, unit_type, tool_type, unit_name, tool_name = [ - x.lower().replace(" ", "_") if isinstance(x, str) else x - for x in record - ] + id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] try: # Recupero informazioni principali @@ -171,15 +178,15 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj # Verifica se il processing può essere eseguito if not _should_process(tool_elab_info, timestamp_matlab_elab): - logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: " - "invio dati non eseguito - due date raggiunta.") + logger.info( + f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: invio dati non eseguito - due date raggiunta." + ) await update_status(cfg, id, fase, pool) return # Routing basato sulla fase - success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if success: await update_status(cfg, id, fase, pool) @@ -207,7 +214,7 @@ def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bo duedate = tool_elab_info.get("duedate") # Se non c'è duedate o è vuota/nulla, può essere processato - if not duedate or duedate in ('0000-00-00 00:00:00', ''): + if not duedate or duedate in ("0000-00-00 00:00:00", ""): return True # Se timestamp_matlab_elab è None/null, usa il timestamp corrente @@ -215,18 +222,18 @@ def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bo # Converti duedate in datetime se è una stringa if isinstance(duedate, str): - duedate = datetime.strptime(duedate, '%Y-%m-%d %H:%M:%S') + duedate = datetime.strptime(duedate, "%Y-%m-%d %H:%M:%S") # Assicurati che comparison_timestamp sia datetime if isinstance(comparison_timestamp, str): - comparison_timestamp = datetime.strptime(comparison_timestamp, '%Y-%m-%d %H:%M:%S') + comparison_timestamp = datetime.strptime(comparison_timestamp, "%Y-%m-%d %H:%M:%S") return duedate > comparison_timestamp - -async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, - timestamp_matlab_elab: datetime, pool: object) -> bool: +async def _route_by_phase( + fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object +) -> bool: """ Routes the processing of a workflow record based on the current phase. @@ -247,20 +254,19 @@ async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, u bool: True if the data sending operation was successful or no action was needed, False otherwise. """ if fase == WorkflowFlags.SENT_ELAB_DATA: - return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, - tool_name, timestamp_matlab_elab, pool) + return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) elif fase == WorkflowFlags.SENT_RAW_DATA: - return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, - tool_name, pool) + return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool) else: logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.") return True -async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, - timestamp_matlab_elab: datetime, pool: object) -> bool: +async def _handle_elab_data_phase( + tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object +) -> bool: """ Handles the phase of sending elaborated data. @@ -281,14 +287,12 @@ async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit bool: True if the data sending operation was successful or no action was needed, False otherwise. """ # FTP send per dati elaborati - if tool_elab_info.get('ftp_send'): - return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + if tool_elab_info.get("ftp_send"): + return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) # API send per dati elaborati elif _should_send_elab_api(tool_elab_info): - return await _send_elab_data_api(cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + return await _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) return True @@ -313,9 +317,8 @@ async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_ bool: True if the data sending operation was successful or no action was needed, False otherwise. """ - # FTP send per dati raw - if tool_elab_info.get('ftp_send_raw'): + if tool_elab_info.get("ftp_send_raw"): return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool) # API send per dati raw @@ -327,16 +330,16 @@ async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_ def _should_send_elab_api(tool_elab_info: dict) -> bool: """Verifica se i dati elaborati devono essere inviati via API.""" - return (tool_elab_info.get('inoltro_api') and - tool_elab_info.get('api_send') and - tool_elab_info.get('inoltro_api_url', '').strip()) + return tool_elab_info.get("inoltro_api") and tool_elab_info.get("api_send") and tool_elab_info.get("inoltro_api_url", "").strip() def _should_send_raw_api(tool_elab_info: dict) -> bool: """Verifica se i dati raw devono essere inviati via API.""" - return (tool_elab_info.get('inoltro_api_raw') and - tool_elab_info.get('api_send_raw') and - tool_elab_info.get('inoltro_api_url_raw', '').strip()) + return ( + tool_elab_info.get("inoltro_api_raw") + and tool_elab_info.get("api_send_raw") + and tool_elab_info.get("inoltro_api_url_raw", "").strip() + ) async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool: @@ -358,8 +361,7 @@ async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str bool: True if the FTP sending was successful, False otherwise. """ try: - elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if not elab_csv: return False @@ -395,8 +397,7 @@ async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str bool: True if the API sending was successful, False otherwise. """ try: - elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool) + elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) if not elab_csv: return False @@ -470,4 +471,4 @@ async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, except Exception as e: logger.error(f"Errore invio API raw data id {id}: {e}") - return False \ No newline at end of file + return False diff --git a/src/utils/connect/send_email.py b/src/utils/connect/send_email.py index e147600..a4a5ebe 100644 --- a/src/utils/connect/send_email.py +++ b/src/utils/connect/send_email.py @@ -1,11 +1,13 @@ -import smtplib import logging +import smtplib from email.message import EmailMessage + from utils.config import loader_email as setting cfg = setting.Config() logger = logging.getLogger(__name__) + async def send_error_email(unit_name: str, tool_name: str, matlab_cmd: str, matlab_error: str, errors: list, warnings: list) -> None: """ Sends an error email containing details about a MATLAB processing failure. @@ -24,24 +26,33 @@ async def send_error_email(unit_name: str, tool_name: str, matlab_cmd: str, matl # Creazione dell'oggetto messaggio msg = EmailMessage() - msg['Subject'] = cfg.subject - msg['From'] = cfg.from_addr - msg['To'] = cfg.to_addr - msg['Cc'] = cfg.cc_addr - msg['Bcc'] = cfg.bcc_addr + msg["Subject"] = cfg.subject + msg["From"] = cfg.from_addr + msg["To"] = cfg.to_addr + msg["Cc"] = cfg.cc_addr + msg["Bcc"] = cfg.bcc_addr MatlabErrors = "
".join(errors) MatlabWarnings = "
".join(dict.fromkeys(warnings)) # Imposta il contenuto del messaggio come HTML - msg.add_alternative(cfg.body.format(unit=unit_name, tool=tool_name, matlab_cmd=matlab_cmd, matlab_error=matlab_error, - MatlabErrors=MatlabErrors, MatlabWarnings=MatlabWarnings), subtype='html') + msg.add_alternative( + cfg.body.format( + unit=unit_name, + tool=tool_name, + matlab_cmd=matlab_cmd, + matlab_error=matlab_error, + MatlabErrors=MatlabErrors, + MatlabWarnings=MatlabWarnings, + ), + subtype="html", + ) try: # Connessione al server SMTP with smtplib.SMTP(cfg.smtp_addr, cfg.smtp_port) as server: server.starttls() # Avvia la crittografia TLS per una connessione sicura - server.login(cfg.smtp_user, cfg.smtp_passwd) # Autenticazione con il server + server.login(cfg.smtp_user, cfg.smtp_passwd) # Autenticazione con il server server.send_message(msg) # Invio dell'email logger.info("Email inviata con successo!") except Exception as e: - logger.error(f"Errore durante l'invio dell'email: {e}") \ No newline at end of file + logger.error(f"Errore durante l'invio dell'email: {e}") diff --git a/src/utils/connect/user_admin.py b/src/utils/connect/user_admin.py index 2d097e4..55ceb0c 100644 --- a/src/utils/connect/user_admin.py +++ b/src/utils/connect/user_admin.py @@ -1,14 +1,15 @@ -import os -import mysql.connector import logging - +import os from hashlib import sha256 from pathlib import Path +import mysql.connector + from utils.database.connection import connetti_db logger = logging.getLogger(__name__) + def ftp_SITE_ADDU(self: object, line: str) -> None: """ Adds a virtual user, creates their directory, and saves their details to the database. @@ -22,38 +23,40 @@ def ftp_SITE_ADDU(self: object, line: str) -> None: user = os.path.basename(parms[0]) # Extract the username password = parms[1] # Get the password hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password - except IndexError: - self.respond('501 SITE ADDU failed. Command needs 2 arguments') + except IndexError: + self.respond("501 SITE ADDU failed. Command needs 2 arguments") else: try: # Create the user's directory Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True) except Exception as e: - self.respond(f'551 Error in create virtual user path: {e}') + self.respond(f"551 Error in create virtual user path: {e}") else: try: # Add the user to the authorizer - self.authorizer.add_user(str(user), - hash, cfg.virtpath + "/" + user, perm=cfg.defperm) + self.authorizer.add_user(str(user), hash, cfg.virtpath + "/" + user, perm=cfg.defperm) # Save the user to the database # Define the database connection try: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") - logger.error(f'{e}') + logger.error(f"{e}") # Create a cursor cur = conn.cursor() - cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')") + cur.execute( + f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')" + ) conn.commit() conn.close() logger.info(f"User {user} created.") - self.respond('200 SITE ADDU successful.') + self.respond("200 SITE ADDU successful.") except Exception as e: - self.respond(f'501 SITE ADDU failed: {e}.') + self.respond(f"501 SITE ADDU failed: {e}.") print(e) + def ftp_SITE_DISU(self: object, line: str) -> None: """ Removes a virtual user from the authorizer and marks them as deleted in the database. @@ -72,7 +75,7 @@ def ftp_SITE_DISU(self: object, line: str) -> None: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") - logger.error(f'{e}') + logger.error(f"{e}") # Crea un cursore cur = conn.cursor() @@ -81,11 +84,12 @@ def ftp_SITE_DISU(self: object, line: str) -> None: conn.close() logger.info(f"User {user} deleted.") - self.respond('200 SITE DISU successful.') + self.respond("200 SITE DISU successful.") except Exception as e: - self.respond('501 SITE DISU failed.') + self.respond("501 SITE DISU failed.") print(e) + def ftp_SITE_ENAU(self: object, line: str) -> None: """ Restores a virtual user by updating their status in the database and adding them back to the authorizer. @@ -102,7 +106,7 @@ def ftp_SITE_ENAU(self: object, line: str) -> None: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") - logger.error(f'{e}') + logger.error(f"{e}") # Crea un cursore cur = conn.cursor() @@ -118,18 +122,19 @@ def ftp_SITE_ENAU(self: object, line: str) -> None: self.authorizer.add_user(ftpuser, hash, virtpath, perm) try: Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) - except Exception as e: - self.responde(f'551 Error in create virtual user path: {e}') + except Exception as e: + self.responde(f"551 Error in create virtual user path: {e}") conn.close() logger.info(f"User {user} restored.") - self.respond('200 SITE ENAU successful.') + self.respond("200 SITE ENAU successful.") except Exception as e: - self.respond('501 SITE ENAU failed.') + self.respond("501 SITE ENAU failed.") print(e) + def ftp_SITE_LSTU(self: object, line: str) -> None: """ Lists all virtual users from the database. @@ -145,15 +150,18 @@ def ftp_SITE_LSTU(self: object, line: str) -> None: conn = connetti_db(cfg) except mysql.connector.Error as e: print(f"Error: {e}") - logger.error(f'{e}') + logger.error(f"{e}") # Crea un cursore cur = conn.cursor() self.push("214-The following virtual users are defined:\r\n") - cur.execute(f'SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}') - [users_list.append(f'Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n') for ftpuser, perm, disabled_at in cur.fetchall()] - self.push(''.join(users_list)) + cur.execute(f"SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}") + [ + users_list.append(f"Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n") + for ftpuser, perm, disabled_at in cur.fetchall() + ] + self.push("".join(users_list)) self.respond("214 LSTU SITE command successful.") - except Exception as e: - self.respond(f'501 list users failed: {e}') \ No newline at end of file + except Exception as e: + self.respond(f"501 list users failed: {e}") diff --git a/src/utils/csv/data_preparation.py b/src/utils/csv/data_preparation.py index 698edff..f2bc730 100644 --- a/src/utils/csv/data_preparation.py +++ b/src/utils/csv/data_preparation.py @@ -1,15 +1,16 @@ #!.venv/bin/python -from utils.database.nodes_query import get_nodes_type -from utils.timestamp.date_check import normalizza_data, normalizza_orario -from utils.database.loader_action import find_nearest_timestamp import logging import re - -from itertools import islice from datetime import datetime, timedelta +from itertools import islice + +from utils.database.loader_action import find_nearest_timestamp +from utils.database.nodes_query import get_nodes_type +from utils.timestamp.date_check import normalizza_data, normalizza_orario logger = logging.getLogger(__name__) + async def get_data(cfg: object, id: int, pool: object) -> tuple: """ Retrieves unit name, tool name, and tool data for a given record ID from the database. @@ -23,11 +24,12 @@ async def get_data(cfg: object, id: int, pool: object) -> tuple: """ async with pool.acquire() as conn: async with conn.cursor() as cur: - await cur.execute(f'select filename, unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}') + await cur.execute(f"select filename, unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}") filename, unit_name, tool_name, tool_data = await cur.fetchone() return filename, unit_name, tool_name, tool_data + async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list: """ Processes pipe-separated data from a CSV record into a structured matrix. @@ -49,24 +51,35 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list: che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina """ - for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: - timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) - EventDate, EventTime = timestamp.split(' ') - if batlevel == '|': + for riga in [ + riga + for riga in righe + if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable() + ]: + timestamp, batlevel, temperature, rilevazioni = riga.split(";", 3) + EventDate, EventTime = timestamp.split(" ") + if batlevel == "|": batlevel = temperature - temperature, rilevazioni = rilevazioni.split(';',1) - ''' in alcune letture mancano temperatura e livello batteria''' - if temperature == '': + temperature, rilevazioni = rilevazioni.split(";", 1) + """ in alcune letture mancano temperatura e livello batteria""" + if temperature == "": temperature = 0 - if batlevel == '': + if batlevel == "": batlevel = 0 - valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' + valori_nodi = ( + rilevazioni.lstrip("|;").rstrip(";").split(";|;") + ) # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): - valori = valori_nodo.split(';') - matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + valori = valori_nodo.split(";") + matrice_valori.append( + [UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + valori + + ([None] * (19 - len(valori))) + ) return matrice_valori + async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list: """ Processes analog and digital input data from a CSV record into a structured matrix. @@ -82,25 +95,34 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list: node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] - pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$' + pattern = r"^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$" if node_ains or node_dins: for riga in [riga for riga in righe if re.match(pattern, riga)]: - timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') - EventDate, EventTime = timestamp.split(' ') + timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(";") + EventDate, EventTime = timestamp.split(" ") if any(node_ains): for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): - matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) + matrice_valori.append( + [UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + [analog_act] + + ([None] * (19 - 1)) + ) else: logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") if any(node_dins): start_node = 3 if any(node_ains) else 1 for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node): - matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) + matrice_valori.append( + [UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + [digital_act] + + ([None] * (19 - 1)) + ) else: logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") return matrice_valori + async def make_channels_matrix(cfg: object, id: int, pool: object) -> list: """ Processes channel-based data from a CSV record into a structured matrix. @@ -116,19 +138,28 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list: node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] - for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: - timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3) - EventDate, EventTime = timestamp.split(' ') - valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] + for riga in [ + riga + for riga in righe + if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable() + ]: + timestamp, batlevel, temperature, rilevazioni = riga.replace(";|;", ";").split(";", 3) + EventDate, EventTime = timestamp.split(" ") + valori_splitted = [valore for valore in rilevazioni.split(";") if valore != "|"] valori_iter = iter(valori_splitted) valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append( + [UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + valori + + ([None] * (19 - len(valori))) + ) return matrice_valori + async def make_musa_matrix(cfg: object, id: int, pool: object) -> list: """ Processes 'Musa' specific data from a CSV record into a structured matrix. @@ -144,20 +175,28 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list: node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] - for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: - timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2) - if timestamp == '': + for riga in [ + riga + for riga in righe + if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable() + ]: + timestamp, batlevel, rilevazioni = riga.replace(";|;", ";").split(";", 2) + if timestamp == "": continue - EventDate, EventTime = timestamp.split(' ') - temperature = rilevazioni.split(';')[0] - logger.info(f'{temperature}, {rilevazioni}') - valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] + EventDate, EventTime = timestamp.split(" ") + temperature = rilevazioni.split(";")[0] + logger.info(f"{temperature}, {rilevazioni}") + valori_splitted = [valore for valore in rilevazioni.split(";") if valore != "|"] valori_iter = iter(valori_splitted) valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append( + [UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + valori + + ([None] * (19 - len(valori))) + ) return matrice_valori @@ -178,17 +217,20 @@ async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list: valori_x_nodo = 2 matrice_valori = [] for riga in righe: - timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(';',4) - EventDate, EventTime = timestamp.split(' ') - lista_rilevazioni = rilevazioni.strip(';').split(';') + timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(";", 4) + EventDate, EventTime = timestamp.split(" ") + lista_rilevazioni = rilevazioni.strip(";").split(";") lista_rilevazioni.append(barometer) - valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] + valori_nodi = [lista_rilevazioni[i : i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append( + [UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + + valori + + ([None] * (19 - len(valori))) + ) return matrice_valori - async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: """ Processes 'GD' specific data from a CSV record into a structured matrix. @@ -203,34 +245,64 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] - pattern = r';-?\d+dB$' - for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: - timestamp, rilevazioni = riga.split(';|;',1) - EventDate, EventTime = timestamp.split(' ') - #logger.debug(f"GD id {id}: {pattern} {rilevazioni}") + pattern = r";-?\d+dB$" + for riga in [ + riga + for riga in righe + if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable() + ]: + timestamp, rilevazioni = riga.split(";|;", 1) + EventDate, EventTime = timestamp.split(" ") + # logger.debug(f"GD id {id}: {pattern} {rilevazioni}") if re.search(pattern, rilevazioni): if len(matrice_valori) == 0: - matrice_valori.append(['RSSI']) - batlevel, temperature, rssi = rilevazioni.split(';') - #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") + matrice_valori.append(["RSSI"]) + batlevel, temperature, rssi = rilevazioni.split(";") + # logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S") start_timestamp = gd_timestamp - timedelta(seconds=45) end_timestamp = gd_timestamp + timedelta(seconds=45) - matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])]) + matrice_valori.append( + [ + UnitName, + ToolNameID.replace("GD", "DT"), + 1, + f"{start_timestamp:%Y-%m-%d %H:%M:%S}", + f"{end_timestamp:%Y-%m-%d %H:%M:%S}", + f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", + batlevel, + temperature, + int(rssi[:-2]), + ] + ) - elif all(char == ';' for char in rilevazioni): + elif all(char == ";" for char in rilevazioni): pass - elif ';|;' in rilevazioni: - unit_metrics, data = rilevazioni.split(';|;') - batlevel, temperature = unit_metrics.split(';') - #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") + elif ";|;" in rilevazioni: + unit_metrics, data = rilevazioni.split(";|;") + batlevel, temperature = unit_metrics.split(";") + # logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") - dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp(cfg, {"timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "unit": UnitName, "tool": ToolNameID.replace("GD", "DT"), "node_num": 1}, pool) - EventDate, EventTime = dt_timestamp.strftime('%Y-%m-%d %H:%M:%S').split(' ') - valori = data.split(';') - matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)] + valori + ([None] * (16 - len(valori))) + [batlevel, temperature, None]) + dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp( + cfg, + { + "timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", + "unit": UnitName, + "tool": ToolNameID.replace("GD", "DT"), + "node_num": 1, + }, + pool, + ) + EventDate, EventTime = dt_timestamp.strftime("%Y-%m-%d %H:%M:%S").split(" ") + valori = data.split(";") + matrice_valori.append( + [UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)] + + valori + + ([None] * (16 - len(valori))) + + [batlevel, temperature, None] + ) else: logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}") - return matrice_valori \ No newline at end of file + return matrice_valori diff --git a/src/utils/csv/loaders.py b/src/utils/csv/loaders.py index 5966999..00e2c5b 100644 --- a/src/utils/csv/loaders.py +++ b/src/utils/csv/loaders.py @@ -1,16 +1,23 @@ import asyncio -import tempfile -import os - -from utils.database.loader_action import load_data, update_status, unlock -from utils.database import WorkflowFlags -from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix, get_data - - import logging +import os +import tempfile + +from utils.csv.data_preparation import ( + get_data, + make_ain_din_matrix, + make_channels_matrix, + make_gd_matrix, + make_musa_matrix, + make_pipe_sep_matrix, + make_tlp_matrix, +) +from utils.database import WorkflowFlags +from utils.database.loader_action import load_data, unlock, update_status logger = logging.getLogger(__name__) + async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: """ Main loader function to process CSV data based on the specified action. @@ -27,7 +34,7 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: "channels": make_channels_matrix, "tlp": make_tlp_matrix, "gd": make_gd_matrix, - "musa": make_musa_matrix + "musa": make_musa_matrix, } if action in type_matrix_mapping: function_to_call = type_matrix_mapping[action] @@ -69,7 +76,8 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s async with conn.cursor() as cur: # Usa SELECT FOR UPDATE per lock atomico - await cur.execute(f""" + await cur.execute( + f""" SELECT id, unit_type, tool_type, unit_name, tool_name FROM {table_name} WHERE locked = 0 @@ -78,15 +86,20 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED - """, (status, status, next_status)) + """, + (status, status, next_status), + ) result = await cur.fetchone() if result: - await cur.execute(f""" + await cur.execute( + f""" UPDATE {table_name} SET locked = 1 WHERE id = %s - """, (result[0],)) + """, + (result[0],), + ) # Commit esplicito per rilasciare il lock await conn.commit() @@ -97,6 +110,7 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s await conn.rollback() raise e + async def main_old_script_loader(cfg: object, id: int, pool: object, script_name: str) -> None: """ This function retrieves CSV data, writes it to a temporary file, @@ -110,21 +124,19 @@ async def main_old_script_loader(cfg: object, id: int, pool: object, script_name """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo - with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: + with tempfile.NamedTemporaryFile(mode="w", prefix=filename, suffix=".csv", delete=False) as temp_file: temp_file.write(ToolData) temp_filename = temp_file.name try: # Usa asyncio.subprocess per vero async process = await asyncio.create_subprocess_exec( - 'python3', f'old_scripts/{script_name}.py', temp_filename, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + "python3", f"old_scripts/{script_name}.py", temp_filename, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() - result_stdout = stdout.decode('utf-8') - result_stderr = stderr.decode('utf-8') + result_stdout = stdout.decode("utf-8") + result_stderr = stderr.decode("utf-8") finally: # Pulire il file temporaneo @@ -138,4 +150,4 @@ async def main_old_script_loader(cfg: object, id: int, pool: object, script_name logger.debug(f"Stdout: {result_stdout}") await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, id, pool) \ No newline at end of file + await unlock(cfg, id, pool) diff --git a/src/utils/csv/parser.py b/src/utils/csv/parser.py index 1a0f408..804bc2a 100644 --- a/src/utils/csv/parser.py +++ b/src/utils/csv/parser.py @@ -1,6 +1,7 @@ import re -def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str='Not Defined') -> str: + +def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str = "Not Defined") -> str: """ Extracts a value from a given source (or sources) based on a list of regex patterns. @@ -12,7 +13,8 @@ def extract_value(patterns: list, primary_source: str, secondary_source: str = N Args: patterns (list): A list of regular expression strings to search for. primary_source (str): The main string to search within. - secondary_source (str, optional): An additional string to search within if no match is found in the primary source. Defaults to None. + secondary_source (str, optional): An additional string to search within if no match is found in the primary source. + Defaults to None. default (str, optional): The value to return if no match is found. Defaults to 'Not Defined'. Returns: diff --git a/src/utils/database/__init__.py b/src/utils/database/__init__.py index 6c2c04c..0154e97 100644 --- a/src/utils/database/__init__.py +++ b/src/utils/database/__init__.py @@ -4,24 +4,25 @@ class WorkflowFlags: Each flag is a power of 2, allowing them to be combined using bitwise operations to represent multiple states simultaneously. """ - CSV_RECEIVED = 0 # 0000 - DATA_LOADED = 1 # 0001 - START_ELAB = 2 # 0010 - DATA_ELABORATED = 4 # 0100 - SENT_RAW_DATA = 8 # 1000 - SENT_ELAB_DATA = 16 # 10000 - DUMMY_ELABORATED = 32 # 100000 (Used for testing or specific dummy elaborations) + + CSV_RECEIVED = 0 # 0000 + DATA_LOADED = 1 # 0001 + START_ELAB = 2 # 0010 + DATA_ELABORATED = 4 # 0100 + SENT_RAW_DATA = 8 # 1000 + SENT_ELAB_DATA = 16 # 10000 + DUMMY_ELABORATED = 32 # 100000 (Used for testing or specific dummy elaborations) + # Mappatura flag -> colonna timestamp FLAG_TO_TIMESTAMP = { - WorkflowFlags.CSV_RECEIVED: "inserted_at", WorkflowFlags.DATA_LOADED: "loaded_at", WorkflowFlags.START_ELAB: "start_elab_at", WorkflowFlags.DATA_ELABORATED: "elaborated_at", WorkflowFlags.SENT_RAW_DATA: "sent_raw_at", WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at", - WorkflowFlags.DUMMY_ELABORATED: "elaborated_at" # Shares the same timestamp column as DATA_ELABORATED + WorkflowFlags.DUMMY_ELABORATED: "elaborated_at", # Shares the same timestamp column as DATA_ELABORATED } """ A dictionary mapping each WorkflowFlag to the corresponding database column @@ -33,4 +34,4 @@ BATCH_SIZE = 1000 """ The number of records to process in a single batch when loading data into the database. This helps manage memory usage and improve performance for large datasets. -""" \ No newline at end of file +""" diff --git a/src/utils/database/action_query.py b/src/utils/database/action_query.py index 1206228..0667cb4 100644 --- a/src/utils/database/action_query.py +++ b/src/utils/database/action_query.py @@ -1,18 +1,18 @@ -import logging -import aiomysql import csv +import logging from io import StringIO + +import aiomysql + from utils.database import WorkflowFlags logger = logging.getLogger(__name__) sub_select = { - WorkflowFlags.DATA_ELABORATED: - """m.matcall, s.`desc` AS statustools""", - WorkflowFlags.SENT_RAW_DATA: - """t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""", - WorkflowFlags.SENT_ELAB_DATA: - """t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw, + WorkflowFlags.DATA_ELABORATED: """m.matcall, s.`desc` AS statustools""", + WorkflowFlags.SENT_RAW_DATA: """t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, + s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""", + WorkflowFlags.SENT_ELAB_DATA: """t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw, IFNULL(u.ftp_addrs_raw, "") AS ftp_addrs_raw, IFNULL(u.ftp_user_raw, "") AS ftp_user_raw, IFNULL(u.ftp_passwd_raw, "") AS ftp_passwd_raw, IFNULL(u.ftp_filename_raw, "") AS ftp_filename_raw, IFNULL(u.ftp_parm_raw, "") AS ftp_parm_raw, IFNULL(u.ftp_target_raw, "") AS ftp_target_raw, @@ -20,8 +20,9 @@ sub_select = { IFNULL(u.inoltro_api_url_raw, "") AS inoltro_api_url_raw, IFNULL(u.inoltro_api_bearer_token_raw, "") AS inoltro_api_bearer_token_raw, t.api_send_raw, IFNULL(u.duedate, "") AS duedate - """ - } + """, +} + async def get_tool_info(next_status: int, unit: str, tool: str, pool: object) -> tuple: """ @@ -89,7 +90,8 @@ async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_ select * from ( select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth', 'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local', - 'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr' + 'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', + 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr' union all select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth, XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local, @@ -133,7 +135,8 @@ async def get_elab_timestamp(id_recv: int, pool: object) -> float: except Exception as e: logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}") return None - + + async def check_flag_elab(pool: object) -> None: async with pool.acquire() as conn: async with conn.cursor() as cur: diff --git a/src/utils/database/connection.py b/src/utils/database/connection.py index 269e36f..50def54 100644 --- a/src/utils/database/connection.py +++ b/src/utils/database/connection.py @@ -1,9 +1,11 @@ import logging + import mysql.connector from mysql.connector import Error logger = logging.getLogger(__name__) + def connetti_db(cfg: object) -> object: """ Establishes a connection to a MySQL database. @@ -21,14 +23,10 @@ def connetti_db(cfg: object) -> object: A MySQL connection object if the connection is successful, otherwise None. """ try: - conn = mysql.connector.connect(user=cfg.dbuser, - password=cfg.dbpass, - host=cfg.dbhost, - port=cfg.dbport, - database=cfg.dbname) + conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) conn.autocommit = True logger.info("Connected") return conn except Error as e: logger.error(f"Database connection error: {e}") - raise # Re-raise the exception to be handled by the caller \ No newline at end of file + raise # Re-raise the exception to be handled by the caller diff --git a/src/utils/database/loader_action.py b/src/utils/database/loader_action.py index 29e9db3..2f9baf4 100644 --- a/src/utils/database/loader_action.py +++ b/src/utils/database/loader_action.py @@ -1,10 +1,10 @@ #!.venv/bin/python -import logging import asyncio - -from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE +import logging from datetime import datetime, timedelta +from utils.database import BATCH_SIZE, FLAG_TO_TIMESTAMP + logger = logging.getLogger(__name__) @@ -75,13 +75,15 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`), `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`), `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`), - `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`), - `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), + `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, + {cfg.dbrawdata}.`BatLevelModule`), + `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, + {cfg.dbrawdata}.`TemperatureModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `Created_at` = NOW() """ - #logger.info(f"Query insert: {sql_load_RAWDATA}.") - #logger.info(f"Matrice valori da inserire: {matrice_valori}.") + # logger.info(f"Query insert: {sql_load_RAWDATA}.") + # logger.info(f"Matrice valori da inserire: {matrice_valori}.") rc = False async with pool.acquire() as conn: async with conn.cursor() as cur: @@ -90,12 +92,12 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) logger.info(f"Loading data attempt {attempt + 1}.") for i in range(0, len(matrice_valori), BATCH_SIZE): - batch = matrice_valori[i:i + BATCH_SIZE] + batch = matrice_valori[i : i + BATCH_SIZE] await cur.executemany(sql_load_RAWDATA, batch) await conn.commit() - logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}") + logger.info(f"Completed batch {i // BATCH_SIZE + 1}/{(len(matrice_valori) - 1) // BATCH_SIZE + 1}") logger.info("Data loaded.") rc = True @@ -106,9 +108,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) # logger.error(f"Matrice valori da inserire: {batch}.") if e.args[0] == 1213: # Deadlock detected - logger.warning( - f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}" - ) + logger.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}") if attempt < cfg.max_retries - 1: delay = 2 * attempt @@ -159,9 +159,7 @@ async def unlock(cfg: object, id: int, pool: object) -> None: async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute( - f"update {cfg.dbrectable} set locked = 0 where id = {id}" - ) + await cur.execute(f"update {cfg.dbrectable} set locked = 0 where id = {id}") await conn.commit() logger.info(f"id {id} unlocked.") except Exception as e: @@ -184,7 +182,8 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tup async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate + await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, + u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m inner join tools as t on t.matfunc = m.id inner join units as u on u.id = t.unit_id @@ -194,6 +193,7 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tup except Exception as e: logger.error(f"Error: {e}") + async def find_nearest_timestamp(cfg: object, unit_tool_data: dict, pool: object) -> tuple: """ Finds the nearest timestamp in the raw data table based on a reference timestamp @@ -222,11 +222,12 @@ async def find_nearest_timestamp(cfg: object, unit_tool_data: dict, pool: object try: await cur.execute(f'''SELECT TIMESTAMP(`EventDate`, `EventTime`) AS event_timestamp, BatLevel, Temperature FROM {cfg.dbrawdata} - WHERE UnitName = "{unit_tool_data["unit"]}" AND ToolNameID = "{unit_tool_data["tool"]}" AND NodeNum = {unit_tool_data["node_num"]} + WHERE UnitName = "{unit_tool_data["unit"]}" AND ToolNameID = "{unit_tool_data["tool"]}" + AND NodeNum = {unit_tool_data["node_num"]} AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN "{start_timestamp}" AND "{end_timestamp}" ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), "{ref_timestamp}")) LIMIT 1 ''') return await cur.fetchone() except Exception as e: - logger.error(f"Error: {e}") \ No newline at end of file + logger.error(f"Error: {e}") diff --git a/src/utils/database/nodes_query.py b/src/utils/database/nodes_query.py index baddfec..b3d71ac 100644 --- a/src/utils/database/nodes_query.py +++ b/src/utils/database/nodes_query.py @@ -1,9 +1,10 @@ - -import aiomysql import logging +import aiomysql + logger = logging.getLogger(__name__) + async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tuple: """Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità. @@ -39,8 +40,8 @@ async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tup else: channels, types, ains, dins = [], [], [], [] for row in results: - channels.append(row['channels']) - types.append(row['type']) - ains.append(row['ain']) - dins.append(row['din']) + channels.append(row["channels"]) + types.append(row["type"]) + ains.append(row["ain"]) + dins.append(row["din"]) return channels, types, ains, dins diff --git a/src/utils/general.py b/src/utils/general.py index b1fac0e..3abc673 100644 --- a/src/utils/general.py +++ b/src/utils/general.py @@ -1,11 +1,11 @@ import glob -import os -from itertools import cycle, chain - import logging +import os +from itertools import chain, cycle logger = logging.getLogger() + def alterna_valori(*valori: any, ping_pong: bool = False) -> any: """ Genera una sequenza ciclica di valori, con opzione per una sequenza "ping-pong". @@ -64,13 +64,13 @@ async def read_error_lines_from_logs(base_path: str, pattern: str) -> tuple[list for file_path in matching_files: try: - with open(file_path, 'r', encoding='utf-8') as file: + with open(file_path, encoding="utf-8") as file: lines = file.readlines() # Usando dict.fromkeys() per mantenere l'ordine e togliere le righe duplicate per i warnings non_empty_lines = [line.strip() for line in lines if line.strip()] - errors = [line for line in non_empty_lines if line.startswith('Error')] - warnings = list(dict.fromkeys(line for line in non_empty_lines if not line.startswith('Error'))) + errors = [line for line in non_empty_lines if line.startswith("Error")] + warnings = list(dict.fromkeys(line for line in non_empty_lines if not line.startswith("Error"))) except Exception as e: logger.error(f"Errore durante la lettura del file {file_path}: {e}") diff --git a/src/utils/orchestrator_utils.py b/src/utils/orchestrator_utils.py index c4255bf..c7b929a 100644 --- a/src/utils/orchestrator_utils.py +++ b/src/utils/orchestrator_utils.py @@ -1,9 +1,11 @@ -import logging import asyncio -import os -import aiomysql import contextvars -from typing import Callable, Coroutine, Any +import logging +import os +from collections.abc import Callable, Coroutine +from typing import Any + +import aiomysql # Crea una context variable per identificare il worker worker_context = contextvars.ContextVar("worker_id", default="^-^") @@ -35,9 +37,7 @@ def setup_logging(log_filename: str, log_level_str: str): """ logger = logging.getLogger() handler = logging.FileHandler(log_filename) - formatter = WorkerFormatter( - "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" - ) + formatter = WorkerFormatter("%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s") handler.setFormatter(formatter) # Rimuovi eventuali handler esistenti e aggiungi il nostro @@ -83,11 +83,7 @@ async def run_orchestrator( pool_recycle=3600, ) - tasks = [ - asyncio.create_task(worker_coro(i, cfg, pool)) - for i in range(cfg.max_threads) - ] - + tasks = [asyncio.create_task(worker_coro(i, cfg, pool)) for i in range(cfg.max_threads)] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") @@ -101,4 +97,4 @@ async def run_orchestrator( logger.info("Info: Shutdown richiesto... chiusura in corso") except Exception as e: - logger.error(f"Errore principale: {e}", exc_info=debug_mode) \ No newline at end of file + logger.error(f"Errore principale: {e}", exc_info=debug_mode) diff --git a/src/utils/parsers/by_type/cr1000x_cr1000x.py b/src/utils/parsers/by_type/cr1000x_cr1000x.py index 34f3417..bb1efb2 100644 --- a/src/utils/parsers/by_type/cr1000x_cr1000x.py +++ b/src/utils/parsers/by_type/cr1000x_cr1000x.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'cr1000x_cr1000x'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/d2w_d2w.py b/src/utils/parsers/by_type/d2w_d2w.py index 7c968eb..412bb06 100644 --- a/src/utils/parsers/by_type/d2w_d2w.py +++ b/src/utils/parsers/by_type/d2w_d2w.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'd2w_d2w'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g201_g201.py b/src/utils/parsers/by_type/g201_g201.py index 5ea7d37..e0c8413 100644 --- a/src/utils/parsers/by_type/g201_g201.py +++ b/src/utils/parsers/by_type/g201_g201.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as channels_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g201_g201'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await channels_main_loader(cfg, id, pool,"channels") \ No newline at end of file + await channels_main_loader(cfg, id, pool, "channels") diff --git a/src/utils/parsers/by_type/g301_g301.py b/src/utils/parsers/by_type/g301_g301.py index 81ba5b7..7598b48 100644 --- a/src/utils/parsers/by_type/g301_g301.py +++ b/src/utils/parsers/by_type/g301_g301.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g301_g301'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g801_iptm.py b/src/utils/parsers/by_type/g801_iptm.py index e3b0418..184cdcd 100644 --- a/src/utils/parsers/by_type/g801_iptm.py +++ b/src/utils/parsers/by_type/g801_iptm.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g801_iptm'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g801_loc.py b/src/utils/parsers/by_type/g801_loc.py index 806e555..f4b46ea 100644 --- a/src/utils/parsers/by_type/g801_loc.py +++ b/src/utils/parsers/by_type/g801_loc.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g801_loc'. diff --git a/src/utils/parsers/by_type/g801_mums.py b/src/utils/parsers/by_type/g801_mums.py index 9a307a9..bbd0af7 100644 --- a/src/utils/parsers/by_type/g801_mums.py +++ b/src/utils/parsers/by_type/g801_mums.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g801_mums'. diff --git a/src/utils/parsers/by_type/g801_musa.py b/src/utils/parsers/by_type/g801_musa.py index 8c197e9..faafe39 100644 --- a/src/utils/parsers/by_type/g801_musa.py +++ b/src/utils/parsers/by_type/g801_musa.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as musa_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g801_musa'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await musa_main_loader(cfg, id, pool, "musa") \ No newline at end of file + await musa_main_loader(cfg, id, pool, "musa") diff --git a/src/utils/parsers/by_type/g801_mux.py b/src/utils/parsers/by_type/g801_mux.py index 4715419..af0b0fa 100644 --- a/src/utils/parsers/by_type/g801_mux.py +++ b/src/utils/parsers/by_type/g801_mux.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as channels_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g801_mux'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await channels_main_loader(cfg, id, pool, "channels") \ No newline at end of file + await channels_main_loader(cfg, id, pool, "channels") diff --git a/src/utils/parsers/by_type/g802_dsas.py b/src/utils/parsers/by_type/g802_dsas.py index 31f58e7..84195fc 100644 --- a/src/utils/parsers/by_type/g802_dsas.py +++ b/src/utils/parsers/by_type/g802_dsas.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_dsas'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g802_gd.py b/src/utils/parsers/by_type/g802_gd.py index ec3f246..5cc8825 100644 --- a/src/utils/parsers/by_type/g802_gd.py +++ b/src/utils/parsers/by_type/g802_gd.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as gd_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_gd'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await gd_main_loader(cfg, id, pool, "gd") \ No newline at end of file + await gd_main_loader(cfg, id, pool, "gd") diff --git a/src/utils/parsers/by_type/g802_loc.py b/src/utils/parsers/by_type/g802_loc.py index 115eed2..184d051 100644 --- a/src/utils/parsers/by_type/g802_loc.py +++ b/src/utils/parsers/by_type/g802_loc.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_loc'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await analog_dig_main_loader(cfg, id, pool, "analogic_digital") \ No newline at end of file + await analog_dig_main_loader(cfg, id, pool, "analogic_digital") diff --git a/src/utils/parsers/by_type/g802_modb.py b/src/utils/parsers/by_type/g802_modb.py index 1116e5d..acde5ec 100644 --- a/src/utils/parsers/by_type/g802_modb.py +++ b/src/utils/parsers/by_type/g802_modb.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_modb'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g802_mums.py b/src/utils/parsers/by_type/g802_mums.py index dfd9e1f..e86ae5f 100644 --- a/src/utils/parsers/by_type/g802_mums.py +++ b/src/utils/parsers/by_type/g802_mums.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_mums'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/g802_mux.py b/src/utils/parsers/by_type/g802_mux.py index e32f864..80f3126 100644 --- a/src/utils/parsers/by_type/g802_mux.py +++ b/src/utils/parsers/by_type/g802_mux.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as channels_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'g802_mux'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await channels_main_loader(cfg, id, pool, "channels") \ No newline at end of file + await channels_main_loader(cfg, id, pool, "channels") diff --git a/src/utils/parsers/by_type/gs1_gs1.py b/src/utils/parsers/by_type/gs1_gs1.py index 99b2da4..89ac539 100644 --- a/src/utils/parsers/by_type/gs1_gs1.py +++ b/src/utils/parsers/by_type/gs1_gs1.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as tlp_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'gs1_gs1'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await tlp_main_loader(cfg, id, pool, "tlp") \ No newline at end of file + await tlp_main_loader(cfg, id, pool, "tlp") diff --git a/src/utils/parsers/by_type/hirpinia_hirpinia.py b/src/utils/parsers/by_type/hirpinia_hirpinia.py index 7a6b90e..a7297c5 100644 --- a/src/utils/parsers/by_type/hirpinia_hirpinia.py +++ b/src/utils/parsers/by_type/hirpinia_hirpinia.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as hirpinia_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'hirpinia_hirpinia'. diff --git a/src/utils/parsers/by_type/hortus_hortus.py b/src/utils/parsers/by_type/hortus_hortus.py index b89acaf..71dc2f0 100644 --- a/src/utils/parsers/by_type/hortus_hortus.py +++ b/src/utils/parsers/by_type/hortus_hortus.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'hortus_hortus'. @@ -13,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") diff --git a/src/utils/parsers/by_type/isi_csv_log_vulink.py b/src/utils/parsers/by_type/isi_csv_log_vulink.py index c66a7c8..0cf7757 100644 --- a/src/utils/parsers/by_type/isi_csv_log_vulink.py +++ b/src/utils/parsers/by_type/isi_csv_log_vulink.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as vulink_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'isi_csv_log_vulink'. diff --git a/src/utils/parsers/by_type/sisgeo_health.py b/src/utils/parsers/by_type/sisgeo_health.py index dc4132c..a16cbb4 100644 --- a/src/utils/parsers/by_type/sisgeo_health.py +++ b/src/utils/parsers/by_type/sisgeo_health.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_health'. diff --git a/src/utils/parsers/by_type/sisgeo_readings.py b/src/utils/parsers/by_type/sisgeo_readings.py index db0612f..9db7b9c 100644 --- a/src/utils/parsers/by_type/sisgeo_readings.py +++ b/src/utils/parsers/by_type/sisgeo_readings.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_readings'. diff --git a/src/utils/parsers/by_type/sorotecpini_co.py b/src/utils/parsers/by_type/sorotecpini_co.py index fb37b7a..231eccf 100644 --- a/src/utils/parsers/by_type/sorotecpini_co.py +++ b/src/utils/parsers/by_type/sorotecpini_co.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as sorotecPini_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'sorotecpini_co'. diff --git a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py index 339fbf4..ae978c6 100644 --- a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py +++ b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_integrity_monitor'. diff --git a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py index 7612bec..9fe1e1b 100644 --- a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py +++ b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py @@ -1,7 +1,7 @@ from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader -async def main_loader(cfg: object, id: int, pool: object) -> None: +async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_messpunktepini'. diff --git a/src/utils/parsers/by_type/tlp_loc.py b/src/utils/parsers/by_type/tlp_loc.py index 4ca8aa7..c338655 100644 --- a/src/utils/parsers/by_type/tlp_loc.py +++ b/src/utils/parsers/by_type/tlp_loc.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'tlp_loc'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await analog_dig_main_loader(cfg, id, pool, "analogic_digital") \ No newline at end of file + await analog_dig_main_loader(cfg, id, pool, "analogic_digital") diff --git a/src/utils/parsers/by_type/tlp_tlp.py b/src/utils/parsers/by_type/tlp_tlp.py index 3474c75..f72c58a 100644 --- a/src/utils/parsers/by_type/tlp_tlp.py +++ b/src/utils/parsers/by_type/tlp_tlp.py @@ -1,5 +1,6 @@ from utils.csv.loaders import main_loader as tlp_main_loader + async def main_loader(cfg: object, id: int, pool: object) -> None: """ Carica ed elabora i dati CSV specifici per il tipo 'tlp_tlp'. @@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None: id (int): L'ID del record CSV da elaborare. pool (object): Il pool di connessioni al database. """ - await tlp_main_loader(cfg, id, pool, "tlp") \ No newline at end of file + await tlp_main_loader(cfg, id, pool, "tlp") diff --git a/src/utils/timestamp/date_check.py b/src/utils/timestamp/date_check.py index 615f954..c2be694 100644 --- a/src/utils/timestamp/date_check.py +++ b/src/utils/timestamp/date_check.py @@ -1,6 +1,7 @@ from datetime import datetime -def normalizza_data(data_string: str)->str: + +def normalizza_data(data_string: str) -> str: """ Normalizza una stringa di data al formato YYYY-MM-DD, provando diversi formati di input. @@ -12,7 +13,12 @@ def normalizza_data(data_string: str)->str: o None se la stringa non può essere interpretata come una data. """ formato_desiderato = "%Y-%m-%d" - formati_input = ["%Y/%m/%d", "%Y-%m-%d", "%d-%m-%Y","%d/%m/%Y", ] # Ordine importante: prova prima il più probabile + formati_input = [ + "%Y/%m/%d", + "%Y-%m-%d", + "%d-%m-%Y", + "%d/%m/%Y", + ] # Ordine importante: prova prima il più probabile for formato_input in formati_input: try: @@ -23,6 +29,7 @@ def normalizza_data(data_string: str)->str: return None # Se nessun formato ha avuto successo + def normalizza_orario(orario_str): try: # Prova prima con HH:MM:SS @@ -34,4 +41,4 @@ def normalizza_orario(orario_str): dt = datetime.strptime(orario_str, "%H:%M") return dt.strftime("%H:%M:%S") except ValueError: - return orario_str # Restituisce originale se non parsabile \ No newline at end of file + return orario_str # Restituisce originale se non parsabile