diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index 83fd0b8..a6c247f 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -24,6 +24,17 @@ ELAB_PROCESSING_DELAY = 0.2 NO_RECORD_SLEEP = 30 async def worker(worker_id: int, cfg: object, pool: object) -> None: + """Esegue il ciclo di lavoro per l'invio dei dati. + + Il worker preleva un record dal database che indica dati pronti per + l'invio (sia raw che elaborati), li processa e attende prima di + iniziare un nuovo ciclo. + + Args: + worker_id (int): L'ID univoco del worker. + cfg (object): L'oggetto di configurazione. + pool (object): Il pool di connessioni al database. + """ # Imposta il context per questo worker worker_context.set(f"W{worker_id:02d}") diff --git a/src/utils/config/loader_ftp_csv.py b/src/utils/config/loader_ftp_csv.py index d275d97..217195e 100644 --- a/src/utils/config/loader_ftp_csv.py +++ b/src/utils/config/loader_ftp_csv.py @@ -5,6 +5,10 @@ from configparser import ConfigParser class Config: def __init__(self): + """ + Initializes the Config class by reading configuration files. + It loads settings from 'ftp.ini' and 'db.ini' for FTP server, CSV, logging, and database. + """ c = ConfigParser() c.read(["../env/ftp.ini", "../env/db.ini"]) @@ -71,4 +75,3 @@ class Config: for item in c.get("ts_pini", "path_match").split('|') for key, value in [item.split(':', 1)] } - diff --git a/src/utils/config/loader_load_data.py b/src/utils/config/loader_load_data.py index c03fc70..77dd3d4 100644 --- a/src/utils/config/loader_load_data.py +++ b/src/utils/config/loader_load_data.py @@ -5,6 +5,10 @@ from configparser import ConfigParser class Config: def __init__(self): + """ + Initializes the Config class by reading configuration files. + It loads settings from 'load.ini' and 'db.ini' for logging, worker, database, and table configurations. + """ c = ConfigParser() c.read(["../env/load.ini", "../env/db.ini"]) @@ -29,4 +33,3 @@ class Config: self.dbrawdata = c.get("tables", "rawTableName") self.dbrawdata = c.get("tables", "rawTableName") self.dbnodes = c.get("tables", "nodesTableName") - diff --git a/src/utils/config/loader_matlab_elab.py b/src/utils/config/loader_matlab_elab.py index eb3cc4b..1d3bee9 100644 --- a/src/utils/config/loader_matlab_elab.py +++ b/src/utils/config/loader_matlab_elab.py @@ -5,6 +5,10 @@ from configparser import ConfigParser class Config: def __init__(self): + """ + Initializes the Config class by reading configuration files. + It loads settings from 'elab.ini' and 'db.ini' for logging, worker, database, table, tool, and Matlab configurations. + """ c = ConfigParser() c.read(["../env/elab.ini", "../env/db.ini"]) diff --git a/src/utils/config/loader_send_data.py b/src/utils/config/loader_send_data.py index df04f08..9c3c16d 100644 --- a/src/utils/config/loader_send_data.py +++ b/src/utils/config/loader_send_data.py @@ -5,6 +5,10 @@ from configparser import ConfigParser class Config: def __init__(self): + """ + Initializes the Config class by reading configuration files. + It loads settings from 'send.ini' and 'db.ini' for logging, worker, database, and table configurations. + """ c = ConfigParser() c.read(["../env/send.ini", "../env/db.ini"]) diff --git a/src/utils/config/users_loader.py b/src/utils/config/users_loader.py index e48eb3f..2e66023 100644 --- a/src/utils/config/users_loader.py +++ b/src/utils/config/users_loader.py @@ -4,6 +4,9 @@ from configparser import ConfigParser class Config: + """ + Handles configuration loading for database settings to load ftp users. + """ def __init__(self): c = ConfigParser() @@ -16,5 +19,3 @@ class Config: self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") self.max_retries = c.getint("db", "maxRetries") - - diff --git a/src/utils/connect/file_management.py b/src/utils/connect/file_management.py index 63cb88e..8fb080d 100644 --- a/src/utils/connect/file_management.py +++ b/src/utils/connect/file_management.py @@ -10,11 +10,16 @@ from utils.csv.parser import extract_value logger = logging.getLogger(__name__) def on_file_received(self: object, file: str) -> None: - """Handles the event when a file is successfully received. + """ + Processes a received file, extracts relevant information, and inserts it into the database. + + If the file is empty, it is removed. Otherwise, it extracts unit and tool + information from the filename and the first few lines of the CSV, handles + aliases, and then inserts the data into the configured database table. Args: - file: The path to the received file. - """ + file (str): The path to the received file.""" + if not os.stat(file).st_size: os.remove(file) logger.info(f'File {file} is empty: removed.') diff --git a/src/utils/connect/send_data.py b/src/utils/connect/send_data.py index 5f4c76e..a9856e4 100644 --- a/src/utils/connect/send_data.py +++ b/src/utils/connect/send_data.py @@ -193,20 +193,16 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj await unlock(cfg, id, pool) -def _should_process_old(tool_elab_info, timestamp_matlab_elab): - """Verifica se il record può essere processato basandosi sulla due date.""" - duedate = tool_elab_info.get("duedate") +def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bool: + """ + Determines if a record should be processed based on its due date. - if not duedate or duedate in ('0000-00-00 00:00:00', ''): - return True + Args: + tool_elab_info (dict): A dictionary containing information about the tool and its due date. + timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. - # Se timestamp_matlab_elab è None/null, usa il timestamp corrente - comparison_timestamp = timestamp_matlab_elab if timestamp_matlab_elab is not None else datetime.now() - - return duedate > comparison_timestamp - - -def _should_process(tool_elab_info, timestamp_matlab_elab): + Returns: + bool: True if the record should be processed, False otherwise.""" """Verifica se il record può essere processato basandosi sulla due date.""" duedate = tool_elab_info.get("duedate") @@ -229,13 +225,26 @@ def _should_process(tool_elab_info, timestamp_matlab_elab): -async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool): +async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, + timestamp_matlab_elab: datetime, pool: object) -> bool: """ - Gestisce il routing delle operazioni in base alla fase del workflow. + Routes the processing of a workflow record based on the current phase. + + This function acts as a dispatcher, calling the appropriate handler function + for sending elaborated data or raw data based on the `fase` (phase) parameter. + + Args: + fase (int): The current phase of the workflow (e.g., WorkflowFlags.SENT_ELAB_DATA, WorkflowFlags.SENT_RAW_DATA). + tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status. + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. + pool (object): The database connection pool. Returns: - bool: True se l'operazione è riuscita, False altrimenti + bool: True if the data sending operation was successful or no action was needed, False otherwise. """ if fase == WorkflowFlags.SENT_ELAB_DATA: return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, @@ -250,10 +259,27 @@ async def _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, return True -async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, - timestamp_matlab_elab, pool): - """Gestisce la fase di invio dati elaborati.""" +async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, + timestamp_matlab_elab: datetime, pool: object) -> bool: + """ + Handles the phase of sending elaborated data. + This function checks if elaborated data needs to be sent via FTP or API + based on the `tool_elab_info` and calls the appropriate sending function. + + Args: + tool_elab_info (dict): A dictionary containing information about the tool and its elaboration status, + including flags for FTP and API sending. + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. + pool (object): The database connection pool. + + Returns: + bool: True if the data sending operation was successful or no action was needed, False otherwise. + """ # FTP send per dati elaborati if tool_elab_info.get('ftp_send'): return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, @@ -267,8 +293,26 @@ async def _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, return True -async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool): - """Gestisce la fase di invio dati raw.""" +async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: + """ + Handles the phase of sending raw data. + + This function checks if raw data needs to be sent via FTP or API + based on the `tool_elab_info` and calls the appropriate sending function. + + Args: + tool_elab_info (dict): A dictionary containing information about the tool and its raw data sending status, + including flags for FTP and API sending. + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + pool (object): The database connection pool. + + Returns: + bool: True if the data sending operation was successful or no action was needed, False otherwise. + """ + # FTP send per dati raw if tool_elab_info.get('ftp_send_raw'): @@ -281,22 +325,38 @@ async def _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, return True -def _should_send_elab_api(tool_elab_info): +def _should_send_elab_api(tool_elab_info: dict) -> bool: """Verifica se i dati elaborati devono essere inviati via API.""" return (tool_elab_info.get('inoltro_api') and tool_elab_info.get('api_send') and tool_elab_info.get('inoltro_api_url', '').strip()) -def _should_send_raw_api(tool_elab_info): +def _should_send_raw_api(tool_elab_info: dict) -> bool: """Verifica se i dati raw devono essere inviati via API.""" return (tool_elab_info.get('inoltro_api_raw') and tool_elab_info.get('api_send_raw') and tool_elab_info.get('inoltro_api_url_raw', '').strip()) -async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): - """Invia dati elaborati via FTP.""" +async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool: + """ + Sends elaborated data via FTP. + + This function retrieves the elaborated CSV data and attempts to send it + to the customer via FTP. It logs success or failure. + + Args: + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. + pool (object): The database connection pool. + + Returns: + bool: True if the FTP sending was successful, False otherwise. + """ try: elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) @@ -316,8 +376,24 @@ async def _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_el return False -async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): - """Invia dati elaborati via API.""" +async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool: + """ + Sends elaborated data via API. + + This function retrieves the elaborated CSV data and attempts to send it + to the customer via an API. It logs success or failure. + + Args: + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + timestamp_matlab_elab (datetime): The timestamp of the last MATLAB elaboration. + pool (object): The database connection pool. + + Returns: + bool: True if the API sending was successful, False otherwise. + """ try: elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool) @@ -337,8 +413,23 @@ async def _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_el return False -async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool): - """Invia dati raw via FTP.""" +async def _send_raw_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: + """ + Sends raw data via FTP. + + This function attempts to send raw CSV data to the customer via FTP. + It logs success or failure. + + Args: + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + pool (object): The database connection pool. + + Returns: + bool: True if the FTP sending was successful, False otherwise. + """ try: # if await ftp_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): if True: # Placeholder per test @@ -352,8 +443,23 @@ async def _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool): return False -async def _send_raw_data_api(cfg, id, unit_name, tool_name, pool): - """Invia dati raw via API.""" +async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str, pool: object) -> bool: + """ + Sends raw data via API. + + This function attempts to send raw CSV data to the customer via an API. + It logs success or failure. + + Args: + cfg (dict): The configuration dictionary. + id (int): The ID of the record being processed. + unit_name (str): The name of the unit associated with the data. + tool_name (str): The name of the tool associated with the data. + pool (object): The database connection pool. + + Returns: + bool: True if the API sending was successful, False otherwise. + """ try: # if await api_send_raw_csv_to_customer(cfg, id, unit_name, tool_name, pool): if True: # Placeholder per test diff --git a/src/utils/csv/loaders.py b/src/utils/csv/loaders.py index 83eab59..5caebed 100644 --- a/src/utils/csv/loaders.py +++ b/src/utils/csv/loaders.py @@ -38,8 +38,24 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: logger.warning(f"Action '{action}' non riconosciuta.") -async def get_next_csv_atomic(pool, table_name, status, next_status): - """Preleva atomicamente il prossimo CSV da elaborare""" +async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_status: int) -> tuple: + """ + Retrieves the next available CSV record for processing in an atomic manner. + + This function acquires a database connection from the pool, begins a transaction, + and attempts to select and lock a single record from the specified table that + matches the given status and has not yet reached the next_status. It uses + `SELECT FOR UPDATE SKIP LOCKED` to ensure atomicity and prevent other workers + from processing the same record concurrently. + + Args: + pool (object): The database connection pool. + table_name (str): The name of the table to query. + status (int): The current status flag that the record must have. + next_status (int): The status flag that the record should NOT have yet. + Returns: + tuple: The next available received record if found, otherwise None. + """ async with pool.acquire() as conn: # IMPORTANTE: Disabilita autocommit per questa transazione await conn.begin() diff --git a/src/utils/csv/parser.py b/src/utils/csv/parser.py index d2a3f2c..1a0f408 100644 --- a/src/utils/csv/parser.py +++ b/src/utils/csv/parser.py @@ -1,7 +1,23 @@ import re -def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default='Not Defined') -> str: +def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str='Not Defined') -> str: + """ + Extracts a value from a given source (or sources) based on a list of regex patterns. + It iterates through the provided patterns and attempts to find a match in the + primary source first, then in the secondary source if provided. The first + successful match is returned. If no match is found after checking all sources + with all patterns, a default value is returned. + + Args: + patterns (list): A list of regular expression strings to search for. + primary_source (str): The main string to search within. + secondary_source (str, optional): An additional string to search within if no match is found in the primary source. Defaults to None. + default (str, optional): The value to return if no match is found. Defaults to 'Not Defined'. + + Returns: + str: The first matched value, or the default value if no match is found. + """ for source in [source for source in (primary_source, secondary_source) if source is not None]: for pattern in patterns: matches = re.findall(pattern, source, re.IGNORECASE) diff --git a/src/utils/parsers/by_type/hirpinia_hirpinia.py b/src/utils/parsers/by_type/hirpinia_hirpinia.py index 4584809..52470b5 100644 --- a/src/utils/parsers/by_type/hirpinia_hirpinia.py +++ b/src/utils/parsers/by_type/hirpinia_hirpinia.py @@ -11,7 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'hirpinia_hirpinia' type. + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('hirpiniaLoadScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: diff --git a/src/utils/parsers/by_type/isi_csv_log_vulink.py b/src/utils/parsers/by_type/isi_csv_log_vulink.py index c153ea1..7b3099b 100644 --- a/src/utils/parsers/by_type/isi_csv_log_vulink.py +++ b/src/utils/parsers/by_type/isi_csv_log_vulink.py @@ -11,6 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'isi_csv_log_vulink' type. + + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('vulinkScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo diff --git a/src/utils/parsers/by_type/sisgeo_health.py b/src/utils/parsers/by_type/sisgeo_health.py index fe6db3b..ba3c2ca 100644 --- a/src/utils/parsers/by_type/sisgeo_health.py +++ b/src/utils/parsers/by_type/sisgeo_health.py @@ -11,6 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'sisgeo_health' type. + + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('sisgeoLoadScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo diff --git a/src/utils/parsers/by_type/sisgeo_readings.py b/src/utils/parsers/by_type/sisgeo_readings.py index fe6db3b..6774a64 100644 --- a/src/utils/parsers/by_type/sisgeo_readings.py +++ b/src/utils/parsers/by_type/sisgeo_readings.py @@ -11,7 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'sisgeo_readings' type. + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('sisgeoLoadScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: diff --git a/src/utils/parsers/by_type/sorotecpini_co.py b/src/utils/parsers/by_type/sorotecpini_co.py index 1ebf312..8c7d2a1 100644 --- a/src/utils/parsers/by_type/sorotecpini_co.py +++ b/src/utils/parsers/by_type/sorotecpini_co.py @@ -11,6 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'sorotecpini_co' type. + + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('sorotecPini.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo diff --git a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py index a7c7dc0..a473537 100644 --- a/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py +++ b/src/utils/parsers/by_type/stazionetotale_integrity_monitor.py @@ -11,6 +11,17 @@ import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'stazionetotale_integrity_monitor' type. + + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('TS_PiniScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo diff --git a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py index a7c7dc0..83ced58 100644 --- a/src/utils/parsers/by_type/stazionetotale_messpunktepini.py +++ b/src/utils/parsers/by_type/stazionetotale_messpunktepini.py @@ -12,6 +12,17 @@ logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object) -> None: + """ + Loads and processes CSV data specific to the 'stazionetotale_messpunktepini' type. + + This function retrieves CSV data, writes it to a temporary file, + executes an external Python script ('TS_PiniScript.py') to process it, + and then updates the workflow status in the database. + Args: + cfg (object): The configuration object. + id (int): The ID of the CSV record to process. + pool (object): The database connection pool. + """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file: