From e9dc7c1192546a215a50662e8bab472d817975aa Mon Sep 17 00:00:00 2001 From: alex Date: Sat, 3 May 2025 15:40:58 +0200 Subject: [PATCH] evol4 --- ftp_csv_receiver.py | 21 ++++++----- orchestrator.py | 32 ++++++++++++++-- utils/database/connection.py | 18 ++++++--- utils/parsers/data_preparation.py | 63 +++++++++++++++++++++++++++++-- utils/parsers/g801_mums.py | 6 ++- 5 files changed, 117 insertions(+), 23 deletions(-) diff --git a/ftp_csv_receiver.py b/ftp_csv_receiver.py index 4d15861..6241eeb 100755 --- a/ftp_csv_receiver.py +++ b/ftp_csv_receiver.py @@ -14,12 +14,14 @@ from pyftpdlib.handlers import FTPHandler from pyftpdlib.servers import FTPServer from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed +# Configure logging (moved inside main function) + logger = logging.getLogger(__name__) class DummySha256Authorizer(DummyAuthorizer): """Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" - def __init__(self, cfg): + def __init__(self: object, cfg: object) -> None: """Initializes the authorizer, adds the admin user, and loads users from the database. Args: @@ -47,7 +49,7 @@ class DummySha256Authorizer(DummyAuthorizer): except Exception as e: self.responde(f'551 Error in create virtual user path: {e}') - def validate_authentication(self, username, password, handler): + def validate_authentication(self: object, username: str, password: str, handler: object) -> None: # Validate the user's password against the stored hash hash = sha256(password.encode("UTF-8")).hexdigest() try: @@ -59,7 +61,7 @@ class DummySha256Authorizer(DummyAuthorizer): class ASEHandler(FTPHandler): """Custom FTP handler that extends FTPHandler with custom commands and file handling.""" - def __init__(self, conn, server, ioloop=None): + def __init__(self: object, conn: object, server: object, ioloop=None) -> None: """Initializes the handler, adds custom commands, and sets up command permissions. Args: @@ -87,26 +89,26 @@ class ASEHandler(FTPHandler): help='Syntax: SITE LSTU (list virtual users).')} ) - def on_file_received(self, file): + def on_file_received(self: object, file: str) -> None: return file_management.on_file_received(self, file) - def on_incomplete_file_received(self, file): + def on_incomplete_file_received(self: object, file: str) -> None: """Removes partially uploaded files. Args: file: The path to the incomplete file. """ os.remove(file) - def ftp_SITE_ADDU(self, line): + def ftp_SITE_ADDU(self: object, line: str) -> None: return user_admin.ftp_SITE_ADDU(self, line) - def ftp_SITE_DISU(self, line): + def ftp_SITE_DISU(self: object, line: str) -> None: return user_admin.ftp_SITE_DISU(self, line) - def ftp_SITE_ENAU(self, line): + def ftp_SITE_ENAU(self: object, line: str) -> None: return user_admin.ftp_SITE_ENAU(self, line) - def ftp_SITE_LSTU(self, line): + def ftp_SITE_LSTU(self: object, line: str) -> None: return user_admin.ftp_SITE_LSTU(self, line) def main(): @@ -128,6 +130,7 @@ def main(): # Configure logging logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", + # Use cfg.logfilename directly without checking its existence filename=cfg.logfilename, level=logging.INFO, ) diff --git a/orchestrator.py b/orchestrator.py index 60b3574..02384c0 100755 --- a/orchestrator.py +++ b/orchestrator.py @@ -1,41 +1,57 @@ #!.venv/bin/python +# Import necessary libraries import mysql.connector import logging import importlib import time import threading +# Import custom modules for configuration and database connection from utils.config import loader as setting from utils.database.connection import connetti_db +# Initialize the logger for this module logger = logging.getLogger(__name__) -def elab_csv(cfg, threads): +# Function to elaborate CSV data +def elab_csv(cfg: object, threads: list) -> bool: try: + # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() + # Select a single record from the raw data table that is not currently locked and has a status of 0 cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() if id: + # If a record is found, lock it by updating the 'locked' field to 1 cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') + # Construct the module name based on unit and tool types for dynamic import module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}' + # Dynamically import the module modulo = importlib.import_module(module_name) + # Get the 'main_loader' function from the imported module funzione = getattr(modulo, "main_loader") - # Chiamare la funzione + # Create a new thread to execute the 'main_loader' function with the configuration and record ID thread = threading.Thread(target = funzione, args=(cfg, id)) + # Add the thread to the list of active threads threads.append(thread) + # Start the thread's execution thread.start() + # Return True to indicate that a record was processed return True else: + # If no record is found, wait for 20 seconds before attempting to fetch again time.sleep(20) + # Return False to indicate that no record was processed return False except mysql.connector.Error as e: + # Handle database connection errors print(f"Error: {e}") logger.error(f'{e}') @@ -44,24 +60,32 @@ def main(): cfg = setting.Config() try: - # Configura la connessione al database PostgreSQL - # Configure logging + # Configure logging to write log messages to a file with a specific format logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, level=logging.INFO, ) + # Initialize an empty list to keep track of active threads threads = [] + # Enter an infinite loop to continuously process records while True: + # Check if the number of active threads exceeds the maximum allowed while len(threads) > cfg.max_threads: + # Iterate through the list of threads for thread in threads: + # If a thread is no longer alive (has finished execution) if not thread.is_alive(): + # Remove it from the list of active threads threads.remove(thread) + # Attempt to process a CSV record if elab_csv(cfg, threads): + # If a record was successfully processed, log the number of threads currently running logger.info(f"Threads in execution: {len(threads)}") pass except KeyboardInterrupt: + # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program logger.info("Info: Shutdown requested...exiting") except Exception as e: diff --git a/utils/database/connection.py b/utils/database/connection.py index bfb7f6a..235e0e7 100644 --- a/utils/database/connection.py +++ b/utils/database/connection.py @@ -4,13 +4,20 @@ import mysql.connector logger = logging.getLogger(__name__) def connetti_db(cfg): - """Establishes a connection to the MySQL database. + """ + Establishes a connection to a MySQL database. Args: - cfg: The configuration object containing database connection details. + cfg: A configuration object containing database connection parameters. + It should have the following attributes: + - dbuser: The database username. + - dbpass: The database password. + - dbhost: The database host address. + - dbport: The database port number. + - dbname: The name of the database to connect to. Returns: - A MySQL database connection object. + A MySQL connection object if the connection is successful, otherwise None. """ try: conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) @@ -18,6 +25,5 @@ def connetti_db(cfg): logger.info("Connected") return conn except mysql.connector.Error as e: - logger.error(f'{e}') - exit(e.errno) - return None \ No newline at end of file + logger.error(f"Database connection error: {e}") + raise # Re-raise the exception to be handled by the caller \ No newline at end of file diff --git a/utils/parsers/data_preparation.py b/utils/parsers/data_preparation.py index 7037c08..6e00764 100644 --- a/utils/parsers/data_preparation.py +++ b/utils/parsers/data_preparation.py @@ -6,7 +6,22 @@ import re logger = logging.getLogger(__name__) -def get_data(cfg, id): +def get_data(cfg: object, id: int) -> tuple: + """ + Retrieves data for a specific tool from the database. + + This function connects to the database using the provided configuration, + executes a query to retrieve the unit name, tool name ID, and tool data + associated with the given ID from the raw data table, and returns the results. + + Args: + cfg: A configuration object containing database connection parameters + and table names (cfg.dbname, cfg.dbrectable). + id: The ID of the tool record to retrieve. + + Returns: + A tuple containing the unit name, tool name ID, and tool data. + """ with connetti_db(cfg) as conn: cur = conn.cursor() cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}') @@ -15,7 +30,28 @@ def get_data(cfg, id): conn.close() return unit_name, tool_name, tool_data -def make_matrix(cfg, id): +def make_matrix(cfg: object, id: int) -> list: + """ + Processes raw tool data and transforms it into a matrix format for database insertion. + + This function retrieves raw tool data using `get_data`, splits it into individual + readings (rows), and further parses each reading into individual values. It + handles data where multiple nodes might be reporting values within a single + reading. The resulting matrix is a list of lists, where each inner list + represents a row of data ready for insertion into the database. Missing + values are padded with `None` to ensure consistent row length. + + Args: + cfg: A configuration object containing database connection parameters + and table names. + id: The ID of the tool record to process. + + Returns: + A list of lists (matrix) representing the processed data. Each inner list + contains the following elements: UnitName, ToolNameID, NodeNum, EventDate, + EventTime, BatLevel, Temperature, followed by up to 16 additional + measurement values (Val0 to ValF), padded with None if necessary. + """ UnitName, ToolNameID, ToolData = get_data(cfg, id) righe = ToolData.splitlines() matrice_valori = [] @@ -29,7 +65,28 @@ def make_matrix(cfg, id): return matrice_valori -def make_loc_matrix(cfg, id): +def make_loc_matrix(cfg: object, id: int) -> list: + """ + Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion. + + This function retrieves raw LOC tool data using `get_data`, splits it into + individual readings (rows), and parses each reading into individual values + specific to the LOC data format (timestamp, battery level, temperature, and + four additional values: ain1, ain2, din1, din2). The resulting matrix is a list + of lists, where each inner list represents a row of data ready for insertion + into the database. Missing values are padded with `None` to ensure consistent + row length. It uses a regular expression to filter lines that match the + expected LOC data format. + + Args: + cfg: A configuration object containing database connection parameters + and table names. + id: The ID of the tool record to process. + + Returns: + A list of lists (matrix) representing the processed LOC data. Each inner + list contains data fields similar to `make_matrix`, adjusted for LOC data. + """ UnitName, ToolNameID, ToolData = get_data(cfg, id) righe = ToolData.splitlines() matrice_valori = [] diff --git a/utils/parsers/g801_mums.py b/utils/parsers/g801_mums.py index 4625994..f01c8f5 100644 --- a/utils/parsers/g801_mums.py +++ b/utils/parsers/g801_mums.py @@ -1,10 +1,14 @@ #!.venv/bin/python +# Import necessary modules from utils.database.loader import load_data from utils.parsers.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) -def main_loader(cfg, id): +# Define the main function for loading data +def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data matrice_valori = make_matrix(cfg, id) + # Load the data into the database load_data(cfg, matrice_valori) \ No newline at end of file