From 95c8ced201e63a120a44b613d60aed131223c565 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 25 May 2025 23:23:00 +0200 Subject: [PATCH] loc ok --- elab_orchestrator.py | 3 ++- load_orchestrator.py | 7 +++---- utils/csv/data_preparation.py | 25 +++++++++++++++++-------- utils/csv/loaders.py | 5 ++++- utils/database/__init__.py | 3 +++ utils/database/loader_action.py | 7 +++---- utils/parsers/by_type/d2w_d2w.py | 2 +- utils/parsers/by_type/g201_g201.py | 3 ++- utils/parsers/by_type/g301_g301.py | 2 +- utils/parsers/by_type/g801_iptm.py | 2 +- utils/parsers/by_type/g801_loc.py | 2 +- utils/parsers/by_type/g801_mums.py | 2 +- utils/parsers/by_type/g801_musa.py | 3 ++- utils/parsers/by_type/g801_mux.py | 3 ++- utils/parsers/by_type/g802_dsas.py | 2 +- utils/parsers/by_type/g802_gd.py | 3 ++- utils/parsers/by_type/g802_loc.py | 2 +- utils/parsers/by_type/g802_modb.py | 2 +- utils/parsers/by_type/g802_mums.py | 2 +- utils/parsers/by_type/g802_mux.py | 2 +- utils/parsers/by_type/gs1_gs1.py | 4 ++-- utils/parsers/by_type/hortus_hortus.py | 2 +- utils/parsers/{ => by_type}/tlp_tlp.py | 3 ++- 23 files changed, 55 insertions(+), 36 deletions(-) rename utils/parsers/{ => by_type}/tlp_tlp.py (92%) diff --git a/elab_orchestrator.py b/elab_orchestrator.py index cf24112..a6e3cc8 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -11,7 +11,8 @@ import subprocess # Import custom modules for configuration and database connection from utils.config import loader_ftp_csv as setting from utils.database.connection import connetti_db -from utils.database.loader_action import DATA_LOADED, get_matlab_cmd +from utils.database.loader_action import get_matlab_cmd +from utils.database import DATA_LOADED # Initialize the logger for this module logger = logging.getLogger(__name__) diff --git a/load_orchestrator.py b/load_orchestrator.py index 726684e..238f7dc 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -10,7 +10,7 @@ import os # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting from utils.database.connection import connetti_db -from utils.database.loader_action import CSV_RECEIVED +from utils.database import CSV_RECEIVED # Initialize the logger for this module logger = logging.getLogger() @@ -49,7 +49,7 @@ async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: await asyncio.sleep(CSV_PROCESSING_DELAY) else: logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") - await asyncio.sleep(CSV_PROCESSING_DELAY*worker_id) + await asyncio.sleep(CSV_PROCESSING_DELAY) # Segnala che il lavoro รจ completato queue.task_done() @@ -107,9 +107,8 @@ async def load_csv(cfg: object) -> tuple: logger.debug(f"Caricamento dinamico del modulo: {module_name}") modulo = importlib.import_module(module_name) logger.debug(f"Funzione 'main_loader' caricata dal modulo {module_name}") - return True, True except (ImportError, AttributeError) as e: - logger.error(f"Errore nel caricamento del modulo {module_name}: {e}", exc_info=debug_mode) + logger.warning(f"Modulo {module_name} non trovato: {e}", exc_info=debug_mode) if not modulo: logger.error(f"Nessun modulo trovato {module_names}") diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index 7ca8453..b666b28 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -91,19 +91,29 @@ def make_ain_din_matrix(cfg: object, id: int) -> list: list contains data fields similar to `make_matrix`, adjusted for LOC data. """ UnitName, ToolNameID, ToolData = get_data(cfg, id) + node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) righe = ToolData.splitlines() matrice_valori = [] - pattern = r'(?:\d{4}/\d{2}/\d{2}|\d{2}/\d{2}/\d{4}) \d{2}:\d{2}:\d{2}(;[^;]+)+' - for riga in [riga for riga in righe if re.match(pattern, riga)]: - timestamp, battery_voltage, unit_temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') - event_date, event_time = timestamp.split(' ') - valori = [analog_input1, analog_input2, digital_input1, digital_input2] - matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(event_date), event_time, battery_voltage, unit_temperature] + valori + ([None] * (19 - len(valori)))) - + pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$' + if node_ains or node_dins: + for riga in [riga for riga in righe if re.match(pattern, riga)]: + timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') + EventDate, EventTime = timestamp.split(' ') + if any(node_dins): + for node_num, digital_act in enumerate([digital_input1, digital_input2], start=1): + matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) + else: + logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") + if any(node_ains): + for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): + matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) + else: + logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") return matrice_valori def make_channels_matrix(cfg: object, id: int) -> list: UnitName, ToolNameID, ToolData = get_data(cfg, id) + node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) righe = ToolData.splitlines() matrice_valori = [] for riga in [riga for riga in righe if ';|;' in riga]: @@ -111,7 +121,6 @@ def make_channels_matrix(cfg: object, id: int) -> list: EventDate, EventTime = timestamp.split(' ') valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] valori_iter = iter(valori_splitted) - node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] diff --git a/utils/csv/loaders.py b/utils/csv/loaders.py index 3d6d455..e1b6f8b 100644 --- a/utils/csv/loaders.py +++ b/utils/csv/loaders.py @@ -1,4 +1,5 @@ -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix import logging @@ -15,6 +16,8 @@ async def main_loader(cfg: object, id: int, action: str) -> None: function_to_call = type_matrix_mapping[action] # Create a matrix of values from the data matrice_valori = function_to_call(cfg, id) + + logger.info("matrice valori creata") # Load the data into the database if load_data(cfg, matrice_valori): update_status(cfg, id, DATA_LOADED) diff --git a/utils/database/__init__.py b/utils/database/__init__.py index e69de29..45c2ad3 100644 --- a/utils/database/__init__.py +++ b/utils/database/__init__.py @@ -0,0 +1,3 @@ +CSV_RECEIVED = 0 +DATA_LOADED = 1 +DATA_ELABORATED = 2 \ No newline at end of file diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index d2b2962..24f7c2f 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -4,14 +4,13 @@ import logging logger = logging.getLogger(__name__) -CSV_RECEIVED = 0 -DATA_LOADED = 1 -DATA_ELABORATED = 2 - timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] def load_data(cfg: object, matrice_valori: list) -> bool : + if not matrice_valori: + logger.info("Nulla da caricare.") + return True sql_insert_RAWDATA = f''' INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, diff --git a/utils/parsers/by_type/d2w_d2w.py b/utils/parsers/by_type/d2w_d2w.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/d2w_d2w.py +++ b/utils/parsers/by_type/d2w_d2w.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g201_g201.py b/utils/parsers/by_type/g201_g201.py index be949f6..ce8c3ec 100644 --- a/utils/parsers/by_type/g201_g201.py +++ b/utils/parsers/by_type/g201_g201.py @@ -1,6 +1,7 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_matrix import logging diff --git a/utils/parsers/by_type/g301_g301.py b/utils/parsers/by_type/g301_g301.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/g301_g301.py +++ b/utils/parsers/by_type/g301_g301.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_iptm.py b/utils/parsers/by_type/g801_iptm.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/g801_iptm.py +++ b/utils/parsers/by_type/g801_iptm.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g801_loc.py b/utils/parsers/by_type/g801_loc.py index c893001..0233b83 100644 --- a/utils/parsers/by_type/g801_loc.py +++ b/utils/parsers/by_type/g801_loc.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader async def main_loader(cfg: object, id: int) -> None: - return analog_dig_main_loader(cfg, id, "analogic_digital") \ No newline at end of file + await analog_dig_main_loader(cfg, id, "analogic_digital") diff --git a/utils/parsers/by_type/g801_mums.py b/utils/parsers/by_type/g801_mums.py index b395c5b..c00ddf4 100644 --- a/utils/parsers/by_type/g801_mums.py +++ b/utils/parsers/by_type/g801_mums.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") + await pipe_sep_main_loader(cfg, id, "pipe_separator") diff --git a/utils/parsers/by_type/g801_musa.py b/utils/parsers/by_type/g801_musa.py index be949f6..ce8c3ec 100644 --- a/utils/parsers/by_type/g801_musa.py +++ b/utils/parsers/by_type/g801_musa.py @@ -1,6 +1,7 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_matrix import logging diff --git a/utils/parsers/by_type/g801_mux.py b/utils/parsers/by_type/g801_mux.py index be949f6..ce8c3ec 100644 --- a/utils/parsers/by_type/g801_mux.py +++ b/utils/parsers/by_type/g801_mux.py @@ -1,6 +1,7 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_matrix import logging diff --git a/utils/parsers/by_type/g802_dsas.py b/utils/parsers/by_type/g802_dsas.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/g802_dsas.py +++ b/utils/parsers/by_type/g802_dsas.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_gd.py b/utils/parsers/by_type/g802_gd.py index be949f6..ce8c3ec 100644 --- a/utils/parsers/by_type/g802_gd.py +++ b/utils/parsers/by_type/g802_gd.py @@ -1,6 +1,7 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_matrix import logging diff --git a/utils/parsers/by_type/g802_loc.py b/utils/parsers/by_type/g802_loc.py index c893001..389fd0f 100644 --- a/utils/parsers/by_type/g802_loc.py +++ b/utils/parsers/by_type/g802_loc.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as analog_dig_main_loader async def main_loader(cfg: object, id: int) -> None: - return analog_dig_main_loader(cfg, id, "analogic_digital") \ No newline at end of file + await analog_dig_main_loader(cfg, id, "analogic_digital") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_modb.py b/utils/parsers/by_type/g802_modb.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/g802_modb.py +++ b/utils/parsers/by_type/g802_modb.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_mums.py b/utils/parsers/by_type/g802_mums.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/g802_mums.py +++ b/utils/parsers/by_type/g802_mums.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/by_type/g802_mux.py b/utils/parsers/by_type/g802_mux.py index ba50936..fd969a8 100644 --- a/utils/parsers/by_type/g802_mux.py +++ b/utils/parsers/by_type/g802_mux.py @@ -1,4 +1,4 @@ from .g801_mux import main_loader as g801_mux_main_loader async def main_loader(cfg: object, id: int) -> None: - return g801_mux_main_loader(cfg, id) \ No newline at end of file + await g801_mux_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/by_type/gs1_gs1.py b/utils/parsers/by_type/gs1_gs1.py index 89de71b..1b94ee6 100644 --- a/utils/parsers/by_type/gs1_gs1.py +++ b/utils/parsers/by_type/gs1_gs1.py @@ -1,4 +1,4 @@ -from ..tlp_tlp import main_loader as tlp_tlp_main_loader +from .tlp_tlp import main_loader as tlp_tlp_main_loader async def main_loader(cfg: object, id: int) -> None: - return tlp_tlp_main_loader(cfg, id) \ No newline at end of file + await tlp_tlp_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/by_type/hortus_hortus.py b/utils/parsers/by_type/hortus_hortus.py index c4d4170..87d55d2 100644 --- a/utils/parsers/by_type/hortus_hortus.py +++ b/utils/parsers/by_type/hortus_hortus.py @@ -1,4 +1,4 @@ from utils.csv.loaders import main_loader as pipe_sep_main_loader async def main_loader(cfg: object, id: int) -> None: - return pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file + await pipe_sep_main_loader(cfg, id, "pipe_separator") \ No newline at end of file diff --git a/utils/parsers/tlp_tlp.py b/utils/parsers/by_type/tlp_tlp.py similarity index 92% rename from utils/parsers/tlp_tlp.py rename to utils/parsers/by_type/tlp_tlp.py index be949f6..ce8c3ec 100644 --- a/utils/parsers/tlp_tlp.py +++ b/utils/parsers/by_type/tlp_tlp.py @@ -1,6 +1,7 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.database.loader_action import load_data, update_status +from utils.database import DATA_LOADED from utils.csv.data_preparation import make_matrix import logging