From 670972bd45d024fe27880a5c1ac83b3827c3caa4 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 26 May 2025 22:38:19 +0200 Subject: [PATCH] fix x channels --- load_orchestrator.py | 28 ++++++++++++++-------------- utils/csv/data_preparation.py | 14 ++++++++------ utils/parsers/by_type/g201_g201.py | 16 ++-------------- utils/parsers/by_type/g801_mux.py | 16 ++-------------- utils/parsers/by_type/g802_mux.py | 4 ++-- 5 files changed, 28 insertions(+), 50 deletions(-) diff --git a/load_orchestrator.py b/load_orchestrator.py index 238f7dc..faec53d 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -39,7 +39,7 @@ async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: logger.info(f"Worker {worker_id} - Inizio elaborazione") - record, success = await load_csv(cfg) + record, success = await load_csv(cfg, worker_id) if not record: logger.debug(f"Worker {worker_id} - Nessun record trovato") @@ -58,7 +58,7 @@ async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: queue.task_done() -async def load_csv(cfg: object) -> tuple: +async def load_csv(cfg: object, worker_id: int) -> tuple: """ Cerca e carica un file CSV da elaborare dal database. @@ -70,12 +70,12 @@ async def load_csv(cfg: object) -> tuple: """ debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) - logger.debug("Inizio ricerca nuovo CSV da elaborare") + logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare") try: with connetti_db(cfg) as conn: cur = conn.cursor() - logger.debug("Connessione al database stabilita") + logger.debug(f"Worker {worker_id} - Connessione al database stabilita") query = f""" SELECT id, unit_type, tool_type, unit_name, tool_name @@ -83,16 +83,16 @@ async def load_csv(cfg: object) -> tuple: WHERE locked = 0 AND status = {CSV_RECEIVED} LIMIT 1 """ - logger.debug(f"Esecuzione query: {query}") + logger.debug(f"Worker {worker_id} - Esecuzione query: {query}") cur.execute(query) result = cur.fetchone() if result: id, unit_type, tool_type, unit_name, tool_name = result - logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") + logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") lock_query = f"UPDATE {cfg.dbname}.{cfg.dbrectable} SET locked = 1 WHERE id = {id}" - logger.debug(f"Lock del record: {lock_query}") + logger.debug(f"Worker {worker_id} - Esecuzione lock del record: {lock_query}") cur.execute(lock_query) conn.commit() @@ -104,14 +104,14 @@ async def load_csv(cfg: object) -> tuple: modulo = None for module_name in module_names: try: - logger.debug(f"Caricamento dinamico del modulo: {module_name}") + logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}") modulo = importlib.import_module(module_name) - logger.debug(f"Funzione 'main_loader' caricata dal modulo {module_name}") + logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}") except (ImportError, AttributeError) as e: - logger.warning(f"Modulo {module_name} non trovato: {e}", exc_info=debug_mode) + logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) if not modulo: - logger.error(f"Nessun modulo trovato {module_names}") + logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}") return True, False # Ottiene la funzione 'main_loader' dal modulo @@ -120,14 +120,14 @@ async def load_csv(cfg: object) -> tuple: # Esegui la funzione await funzione(cfg, id) - logger.info(f"Elaborazione completata per ID={id}") + logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}") return True, True else: - logger.debug("Nessun record disponibile per l'elaborazione") + logger.debug(f"Worker {worker_id} - Nessun record disponibile per l'elaborazione") return False, False except mysql.connector.Error as e: - logger.error(f"Errore di database: {e}", exc_info=debug_mode) + logger.error(f"Worker {worker_id} - Errore database: {e}", exc_info=debug_mode) return False, False diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index b666b28..dbdd7fc 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -99,16 +99,18 @@ def make_ain_din_matrix(cfg: object, id: int) -> list: for riga in [riga for riga in righe if re.match(pattern, riga)]: timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') EventDate, EventTime = timestamp.split(' ') + if any(node_ains): + for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): + matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) + else: + logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") if any(node_dins): - for node_num, digital_act in enumerate([digital_input1, digital_input2], start=1): + start_node = 3 if any(node_ains) else 1 + for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node): matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) else: logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") - if any(node_ains): - for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): - matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) - else: - logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") + return matrice_valori def make_channels_matrix(cfg: object, id: int) -> list: diff --git a/utils/parsers/by_type/g201_g201.py b/utils/parsers/by_type/g201_g201.py index ce8c3ec..2f43f9c 100644 --- a/utils/parsers/by_type/g201_g201.py +++ b/utils/parsers/by_type/g201_g201.py @@ -1,16 +1,4 @@ -#!.venv/bin/python -# Import necessary modules -from utils.database.loader_action import load_data, update_status -from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_matrix -import logging +from utils.csv.loaders import main_loader as channels_main_loader -logger = logging.getLogger(__name__) - -# Define the main function for loading data async def main_loader(cfg: object, id: int) -> None: - # Create a matrix of values from the data - matrice_valori = make_matrix(cfg, id) - # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) \ No newline at end of file + await channels_main_loader(cfg, id, "channels") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_mux.py b/utils/parsers/by_type/g801_mux.py index ce8c3ec..2f43f9c 100644 --- a/utils/parsers/by_type/g801_mux.py +++ b/utils/parsers/by_type/g801_mux.py @@ -1,16 +1,4 @@ -#!.venv/bin/python -# Import necessary modules -from utils.database.loader_action import load_data, update_status -from utils.database import DATA_LOADED -from utils.csv.data_preparation import make_matrix -import logging +from utils.csv.loaders import main_loader as channels_main_loader -logger = logging.getLogger(__name__) - -# Define the main function for loading data async def main_loader(cfg: object, id: int) -> None: - # Create a matrix of values from the data - matrice_valori = make_matrix(cfg, id) - # Load the data into the database - if load_data(cfg, matrice_valori): - update_status(cfg, id, DATA_LOADED) \ No newline at end of file + await channels_main_loader(cfg, id, "channels") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_mux.py b/utils/parsers/by_type/g802_mux.py index fd969a8..2f43f9c 100644 --- a/utils/parsers/by_type/g802_mux.py +++ b/utils/parsers/by_type/g802_mux.py @@ -1,4 +1,4 @@ -from .g801_mux import main_loader as g801_mux_main_loader +from utils.csv.loaders import main_loader as channels_main_loader async def main_loader(cfg: object, id: int) -> None: - await g801_mux_main_loader(cfg, id) \ No newline at end of file + await channels_main_loader(cfg, id, "channels") \ No newline at end of file