From 991eb6900d9f490f796d4658ccfe3c04479707d6 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 1 Jun 2025 21:33:03 +0200 Subject: [PATCH] altre fix --- env/db.ini | 1 + env/load.ini | 2 +- load_orchestrator.py | 63 ++++++++++++++--------- utils/config/loader_ftp_csv.py | 2 + utils/config/loader_load_data.py | 1 + utils/config/loader_matlab_elab.py | 1 + utils/csv/data_preparation.py | 64 +++++++++++++++++++++++- utils/csv/loaders.py | 10 ++-- utils/database/loader_action.py | 48 +++++++++++++----- utils/parsers/by_type/cr1000x_cr1000x.py | 4 +- utils/parsers/by_type/d2w_d2w.py | 4 +- utils/parsers/by_type/g801_musa.py | 18 ++----- utils/parsers/by_type/g802_gd.py | 18 ++----- utils/parsers/by_type/gs1_gs1.py | 4 +- utils/parsers/by_type/tlp_tlp.py | 18 ++----- 15 files changed, 165 insertions(+), 93 deletions(-) diff --git a/env/db.ini b/env/db.ini index 78a3531..366d05f 100644 --- a/env/db.ini +++ b/env/db.ini @@ -7,6 +7,7 @@ user = root password = batt1l0 dbName = ase_lar + maxRetries = 5 [tables] userTableName = virtusers diff --git a/env/load.ini b/env/load.ini index 39ce66c..2f11ff0 100644 --- a/env/load.ini +++ b/env/load.ini @@ -2,4 +2,4 @@ logFilename = ./load_raw_data.log [threads] - max_num = 10 \ No newline at end of file + max_num = 5 \ No newline at end of file diff --git a/load_orchestrator.py b/load_orchestrator.py index d34faab..364f957 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -6,11 +6,21 @@ import importlib import asyncio import os import aiomysql +import contextvars # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting from utils.database import CSV_RECEIVED +# Crea una context variable per identificare il worker +worker_context = contextvars.ContextVar('worker_id', default='00') + +# Formatter personalizzato che include il worker_id +class WorkerFormatter(logging.Formatter): + def format(self, record): + record.worker_id = worker_context.get() + return super().format(record) + # Initialize the logger for this module logger = logging.getLogger() @@ -45,34 +55,36 @@ async def get_next_csv_atomic(pool, table_name): return result async def worker(worker_id: int, cfg: object, pool) -> None: + # Imposta il context per questo worker + worker_context.set(f"W{worker_id}") + debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) - logger.info(f"Worker {worker_id} - Avviato") + logger.info("Avviato") while True: try: - logger.info(f"Worker {worker_id} - Inizio elaborazione") + logger.info("Inizio elaborazione") record = await get_next_csv_atomic(pool, cfg.dbrectable) if record: - success = await load_csv(record, cfg, worker_id, pool) + success = await load_csv(record, cfg, pool) if not success: - logger.error(f"Worker {worker_id} - Errore durante l'elaborazione") + logger.error("Errore durante l'elaborazione") await asyncio.sleep(CSV_PROCESSING_DELAY) else: await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: - logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode) + logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) await asyncio.sleep(1) -async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool: - +async def load_csv(record: tuple, cfg: object, pool) -> bool: debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) - logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare") + logger.debug("Inizio ricerca nuovo CSV da elaborare") id, unit_type, tool_type, unit_name, tool_name = record - logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") + logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") # Costruisce il nome del modulo da caricare dinamicamente module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', @@ -82,30 +94,26 @@ async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool: modulo = None for module_name in module_names: try: - logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}") + logger.debug(f"Caricamento dinamico del modulo: {module_name}") modulo = importlib.import_module(module_name) - logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}") + logger.debug(f"Funzione 'main_loader' caricata dal modulo {module_name}") break except (ImportError, AttributeError) as e: - logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) + logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) if not modulo: - logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}") + logger.error(f"Nessun modulo trovato {module_names}") return False # Ottiene la funzione 'main_loader' dal modulo - funzione = getattr(modulo, "main_loader") # Esegui la funzione - - logger.info(f"Worker {worker_id} - Elaborazione con modulo {modulo} per ID={id}") + logger.info(f"Elaborazione con modulo {modulo} per ID={id}") await funzione(cfg, id, pool) - logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}") + logger.info(f"Elaborazione completata per ID={id}") return True - - async def main(): """Main function: avvia i worker e gestisce il ciclo principale.""" logger.info("Avvio del sistema...") @@ -118,13 +126,19 @@ async def main(): log_level = os.getenv("LOG_LEVEL", "INFO").upper() debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) - logging.basicConfig( - format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", - filename=cfg.logfilename, - level=log_level, + # Configura il logging con il formatter personalizzato + handler = logging.FileHandler(cfg.logfilename) + formatter = WorkerFormatter( + "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" ) - logger.info("Logging configurato correttamente") + handler.setFormatter(formatter) + # Rimuovi eventuali handler esistenti e aggiungi il nostro + logger.handlers.clear() + logger.addHandler(handler) + logger.setLevel(getattr(logging, log_level)) + + logger.info("Logging configurato correttamente") # Numero massimo di worker concorrenti logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") @@ -158,6 +172,5 @@ async def main(): except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode) - if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/utils/config/loader_ftp_csv.py b/utils/config/loader_ftp_csv.py index 912d5ac..a8c0a47 100644 --- a/utils/config/loader_ftp_csv.py +++ b/utils/config/loader_ftp_csv.py @@ -32,6 +32,8 @@ class Config: self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") + self.max_retries = c.getint("db", "maxRetries") + # Tables self.dbusertable = c.get("tables", "userTableName") diff --git a/utils/config/loader_load_data.py b/utils/config/loader_load_data.py index 69f4d68..8d21e78 100644 --- a/utils/config/loader_load_data.py +++ b/utils/config/loader_load_data.py @@ -21,6 +21,7 @@ class Config: self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") + self.max_retries = c.getint("db", "maxRetries") # Tables self.dbusertable = c.get("tables", "userTableName") diff --git a/utils/config/loader_matlab_elab.py b/utils/config/loader_matlab_elab.py index 74ef69c..edcd362 100644 --- a/utils/config/loader_matlab_elab.py +++ b/utils/config/loader_matlab_elab.py @@ -21,6 +21,7 @@ class Config: self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") + self.max_retries = c.getint("db", "maxRetries") # Tables self.dbusertable = c.get("tables", "userTableName") diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index 49808e3..34c88df 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -44,6 +44,9 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list: for riga in [riga for riga in righe if ';|;' in riga]: timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) EventDate, EventTime = timestamp.split(' ') + if batlevel == '|': + batlevel = temperature + temperature, rilevazioni = rilevazioni.split(';',1) valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): valori = valori_nodo.split(';') @@ -102,7 +105,7 @@ async def make_channels_matrix(cfg: object, id: int, pool) -> list: righe = ToolData.splitlines() matrice_valori = [] for riga in [riga for riga in righe if ';|;' in riga]: - timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3) EventDate, EventTime = timestamp.split(' ') valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] valori_iter = iter(valori_splitted) @@ -113,3 +116,62 @@ async def make_channels_matrix(cfg: object, id: int, pool) -> list: matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori + +async def make_musa_matrix(cfg: object, id: int, pool) -> list: + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) + node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) + righe = ToolData.splitlines() + matrice_valori = [] + for riga in [riga for riga in righe if ';|;' in riga]: + timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2) + if timestamp == '': + continue + EventDate, EventTime = timestamp.split(' ') + temperature = rilevazioni.split(';')[0] + logger.info(f'{temperature}, {rilevazioni}') + valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] + valori_iter = iter(valori_splitted) + + valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] + + for num_nodo, valori in enumerate(valori_nodi, start=1): + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + + return matrice_valori + + +async def make_tlp_matrix(cfg: object, id: int, pool) -> list: + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) + righe = ToolData.splitlines() + valori_x_nodo = 2 + matrice_valori = [] + for riga in righe: + timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(';',4) + EventDate, EventTime = timestamp.split(' ') + lista_rilevazioni = rilevazioni.strip(';').split(';') + lista_rilevazioni.append(barometer) + valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] + for num_nodo, valori in enumerate(valori_nodi, start=1): + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + return matrice_valori + + + +async def make_gd_matrix(cfg: object, id: int, pool) -> list: + UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) + righe = ToolData.splitlines() + matrice_valori = [] + pattern = r'^-\d*dB$' + for riga in [riga for riga in righe if ';|;' in riga]: + timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + EventDate, EventTime = timestamp.split(' ') + if batlevel == '|': + batlevel = temperature + temperature, rilevazioni = rilevazioni.split(';',1) + if re.match(pattern, rilevazioni): + valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' + for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): + valori = valori_nodo.split(';') + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + + return matrice_valori \ No newline at end of file diff --git a/utils/csv/loaders.py b/utils/csv/loaders.py index c2f4b49..cab908e 100644 --- a/utils/csv/loaders.py +++ b/utils/csv/loaders.py @@ -1,6 +1,6 @@ -from utils.database.loader_action import load_data, update_status +from utils.database.loader_action import load_data, update_status, unlock from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix +from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix import logging @@ -10,7 +10,10 @@ async def main_loader(cfg: object, id: int, pool, action: str) -> None: type_matrix_mapping = { "pipe_separator": make_pipe_sep_matrix, "analogic_digital": make_ain_din_matrix, - "channels": make_channels_matrix + "channels": make_channels_matrix, + "tlp": make_tlp_matrix, + "gd": make_gd_matrix, + "musa": make_musa_matrix } if action in type_matrix_mapping: function_to_call = type_matrix_mapping[action] @@ -21,5 +24,6 @@ async def main_loader(cfg: object, id: int, pool, action: str) -> None: # Load the data into the database if await load_data(cfg, matrice_valori, pool): await update_status(cfg, id, DATA_LOADED, pool) + await unlock(cfg, id, pool) else: logger.warning(f"Action '{action}' non riconosciuta.") diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index 630668f..26ac543 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -1,5 +1,6 @@ #!.venv/bin/python import logging +import asyncio logger = logging.getLogger(__name__) @@ -50,29 +51,52 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : async with pool.acquire() as conn: async with conn.cursor() as cur: - try: - await cur.executemany(sql_insert_RAWDATA, matrice_valori) - await conn.commit() - logging.info("Data loaded.") - rc = True - except Exception as e: - await conn.rollback() - logging.error(f"Error: {e}.") - rc = False - finally: - return rc + rc = False + for attempt in range(cfg.max_retries): + try: + await cur.executemany(sql_insert_RAWDATA, matrice_valori) + await conn.commit() + logging.info("Data loaded.") + rc = True + except Exception as e: + await conn.rollback() + logging.error(f"Error: {e}.") + + if e.args[0] == 1213: # Deadlock detected + logging.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}") + + if attempt < cfg.max_retries - 1: + delay = (2 * attempt) + await asyncio.sleep(delay) + continue + else: + logging.error("Max retry attempts reached for deadlock") + raise + finally: + return rc async def update_status(cfg: object, id: int, status: int, pool) -> None: async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute(f'update {cfg.dbrectable} set locked = 0, status = {status}, {timestamp_cols[status]} = now() where id = {id}') + await cur.execute(f'update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}') await conn.commit() logging.info("Status updated.") except Exception as e: await conn.rollback() logging.error(f'Error: {e}') +async def unlock(cfg: object, id: int, pool) -> None: + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(f'update {cfg.dbrectable} set locked = 0 where id = {id}') + await conn.commit() + logging.info(f"id {id} unlocked.") + except Exception as e: + await conn.rollback() + logging.error(f'Error: {e}') + async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple: async with pool.acquire() as conn: async with conn.cursor() as cur: diff --git a/utils/parsers/by_type/cr1000x_cr1000x.py b/utils/parsers/by_type/cr1000x_cr1000x.py index c4d4170..c6948eb 100644 --- a/utils/parsers/by_type/cr1000x_cr1000x.py +++ b/utils/parsers/by_type/cr1000x_cr1000x.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/d2w_d2w.py b/utils/parsers/by_type/d2w_d2w.py index 87d55d2..c6948eb 100644 --- a/utils/parsers/by_type/d2w_d2w.py +++ b/utils/parsers/by_type/d2w_d2w.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader -async def main_loader(cfg: object, id: int) -> None: - await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_musa.py b/utils/parsers/by_type/g801_musa.py index ce8c3ec..ba5dc13 100644 --- a/utils/parsers/by_type/g801_musa.py +++ b/utils/parsers/by_type/g801_musa.py @@ -1,16 +1,4 @@ -#!.venv/bin/python -# Import necessary modules -from utils.database.loader_action import load_data, update_status -from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_matrix -import logging +from utils.csv.loaders import main_loader as musa_main_loader -logger = logging.getLogger(__name__) - -# Define the main function for loading data -async def main_loader(cfg: object, id: int) -> None: - # Create a matrix of values from the data - matrice_valori = make_matrix(cfg, id) - # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await musa_main_loader(cfg, id, pool, "musa") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_gd.py b/utils/parsers/by_type/g802_gd.py index ce8c3ec..c90d2e5 100644 --- a/utils/parsers/by_type/g802_gd.py +++ b/utils/parsers/by_type/g802_gd.py @@ -1,16 +1,4 @@ -#!.venv/bin/python -# Import necessary modules -from utils.database.loader_action import load_data, update_status -from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_matrix -import logging +from utils.csv.loaders import main_loader as gd_main_loader -logger = logging.getLogger(__name__) - -# Define the main function for loading data -async def main_loader(cfg: object, id: int) -> None: - # Create a matrix of values from the data - matrice_valori = make_matrix(cfg, id) - # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await gd_main_loader(cfg, id, pool, "gd") \ No newline at end of file diff --git a/utils/parsers/by_type/gs1_gs1.py b/utils/parsers/by_type/gs1_gs1.py index 9814dea..3a1d09e 100644 --- a/utils/parsers/by_type/gs1_gs1.py +++ b/utils/parsers/by_type/gs1_gs1.py @@ -1,4 +1,4 @@ -from .tlp_tlp import main_loader as tlp_tlp_main_loader +from utils.csv.loaders import main_loader as tlp_main_loader async def main_loader(cfg: object, id: int, pool) -> None: - await tlp_tlp_main_loader(cfg, id) \ No newline at end of file + await tlp_main_loader(cfg, id, pool, "tlp") \ No newline at end of file diff --git a/utils/parsers/by_type/tlp_tlp.py b/utils/parsers/by_type/tlp_tlp.py index ce8c3ec..3a1d09e 100644 --- a/utils/parsers/by_type/tlp_tlp.py +++ b/utils/parsers/by_type/tlp_tlp.py @@ -1,16 +1,4 @@ -#!.venv/bin/python -# Import necessary modules -from utils.database.loader_action import load_data, update_status -from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_matrix -import logging +from utils.csv.loaders import main_loader as tlp_main_loader -logger = logging.getLogger(__name__) - -# Define the main function for loading data -async def main_loader(cfg: object, id: int) -> None: - # Create a matrix of values from the data - matrice_valori = make_matrix(cfg, id) - # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) \ No newline at end of file +async def main_loader(cfg: object, id: int, pool) -> None: + await tlp_main_loader(cfg, id, pool, "tlp") \ No newline at end of file