From 12ac522b9812bf9a6abd6320230b3ea8a7c7a986 Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 11 May 2025 16:40:46 +0200 Subject: [PATCH] load async worker --- load_orchestrator.py | 170 ++++++++++++++++++++++++------- utils/parsers/cr1000x_cr1000x.py | 18 +++- utils/parsers/d2w_d2w.py | 18 +++- utils/parsers/g201_g201.py | 18 +++- utils/parsers/g301_g301.py | 18 +++- utils/parsers/g801_mums.py | 2 +- utils/parsers/g801_musa.py | 20 +++- utils/parsers/g801_mux.py | 20 +++- utils/parsers/g802_gd.py | 20 +++- utils/parsers/tlp_tlp.py | 20 +++- 10 files changed, 255 insertions(+), 69 deletions(-) diff --git a/load_orchestrator.py b/load_orchestrator.py index 2224a2f..5fa2956 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -14,70 +14,168 @@ from utils.database.loader_action import CSV_RECEIVED # Initialize the logger for this module logger = logging.getLogger(__name__) -# Function to elaborate CSV data -async def load_csv(cfg: object) -> bool: +# Delay tra un processamento CSV e il successivo (in secondi) +CSV_PROCESSING_DELAY = 0.1 +# Tempo di attesa se non ci sono record da elaborare +NO_RECORD_SLEEP = 20 + + +async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: + """ + Worker asyncrono che preleva lavori dalla coda e li esegue. + + Args: + worker_id (int): ID univoco del worker. + queue (asyncio.Queue): Coda da cui prendere i lavori. + cfg (object): Configurazione caricata. + """ + logger.info(f"Worker {worker_id} - Avviato") + + while True: + try: + # Preleva un "lavoro" dalla coda (in questo caso non ci sono parametri) + await queue.get() + + logger.info(f"Worker {worker_id} - Inizio elaborazione") + + record, success = await load_csv(cfg) + + if not record: + logger.debug(f"Worker {worker_id} - Nessun record trovato") + await asyncio.sleep(NO_RECORD_SLEEP) + if not success: + logger.error(f"Worker {worker_id} - Errore durante l'elaborazione") + await asyncio.sleep(CSV_PROCESSING_DELAY) + else: + logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") + await asyncio.sleep(CSV_PROCESSING_DELAY) + + # Segnala che il lavoro è completato + queue.task_done() + except Exception as e: + logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=True) + queue.task_done() + + +async def load_csv(cfg: object) -> tuple: + """ + Cerca e carica un file CSV da elaborare dal database. + + Args: + cfg (object): Oggetto configurazione contenente dati per DB e altro. + + Returns: + bool: True se è stato trovato ed elaborato un record, False altrimenti. + """ + logger.debug("Inizio ricerca nuovo CSV da elaborare") + try: - # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() - # Select a single record from the raw data table that is not currently locked and has a status of 0 - cur.execute(f'select id, unit_type, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1') - id, unit_type, tool_type = cur.fetchone() - if id: - # If a record is found, lock it by updating the 'locked' field to 1 - cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') + logger.debug("Connessione al database stabilita") - # Construct the module name based on unit and tool types for dynamic import + query = f""" + SELECT id, unit_type, tool_type + FROM {cfg.dbname}.{cfg.dbrectable} + WHERE locked = 0 AND status = {CSV_RECEIVED} + LIMIT 1 + """ + logger.debug(f"Esecuzione query: {query}") + cur.execute(query) + result = cur.fetchone() + + if result: + id, unit_type, tool_type = result + logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}") + + lock_query = f"UPDATE {cfg.dbname}.{cfg.dbrectable} SET locked = 1 WHERE id = {id}" + logger.debug(f"Lock del record: {lock_query}") + cur.execute(lock_query) + conn.commit() + + # Costruisce il nome del modulo da caricare dinamicamente module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}' - # Dynamically import the module - modulo = importlib.import_module(module_name) + logger.debug(f"Caricamento modulo dinamico: {module_name}") - # Get the 'main_loader' function from the imported module - funzione = getattr(modulo, "main_loader") + try: + modulo = importlib.import_module(module_name) + funzione = getattr(modulo, "main_loader") + logger.debug(f"Funzione 'main_loader' trovata nel modulo {module_name}") - funzione(cfg, id) - - return True + # Esegui la funzione async + await funzione(cfg, id) + logger.info(f"Elaborazione completata per ID={id}") + return True, True + except (ImportError, AttributeError) as e: + logger.error(f"Errore nel caricamento del modulo o della funzione: {e}", exc_info=True) + return True, False else: - # If no record is found, wait for 20 seconds before attempting to fetch again - await asyncio.sleep(20) - # Return False to indicate that no record was processed - return False + logger.debug("Nessun record disponibile per l'elaborazione") + return False, False except mysql.connector.Error as e: - # Handle database connection errors - logger.error(f'{e}') + logger.error(f"Errore di database: {e}", exc_info=True) + return False, False + async def main(): - # Load the configuration settings + """Main function: avvia i worker e gestisce il ciclo principale.""" + logger.info("Avvio del sistema...") + cfg = setting.Config() + logger.info("Configurazione caricata correttamente") try: - # Configure logging to write log messages to a file with a specific format + # Configura il logging globale logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, level=logging.INFO, ) + logger.info("Logging configurato correttamente") - # Limita il numero di esecuzioni concorrenti a max_threads - semaphore = asyncio.Semaphore(cfg.max_threads) + # Crea una coda di lavoro illimitata + queue = asyncio.Queue(maxsize=cfg.queue_maxsize or 10) + logger.debug("Coda di lavoro creata") - # Enter an infinite loop to continuously process records + # Numero massimo di worker concorrenti + num_workers = cfg.max_threads + logger.info(f"Avvio di {num_workers} worker concorrenti") + + # Avvia i worker + workers = [ + asyncio.create_task(worker(i, queue, cfg)) for i in range(num_workers) + ] + + logger.info("Sistema avviato correttamente. In attesa di nuovi task...") + + # Ciclo infinito per aggiungere lavori alla coda while True: - async with semaphore: - try: - await asyncio.create_task(load_csv(cfg)) + logger.debug("Aggiunta di un nuovo lavoro alla coda") + await queue.put(None) - except Exception as e: - logger.error(f"Error: {e}.") + # Breve attesa prima di aggiungere un altro lavoro + await asyncio.sleep(0.5) except KeyboardInterrupt: - # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program - logger.info("Info: Shutdown requested...exiting") + logger.info("Info: Shutdown richiesto... chiusura in corso") + + # Attendi che tutti i lavori pendenti siano completati + logger.info("Attesa completamento dei task in coda...") + await queue.join() + + # Ferma i worker + logger.info("Chiusura dei worker in corso...") + for task in workers: + task.cancel() + + await asyncio.gather(*workers, return_exceptions=True) + + logger.info("Info: Tutti i task terminati. Uscita.") except Exception as e: - logger.error(f"Error: {e}.") + logger.error(f"Errore principale: {e}", exc_info=True) + if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/utils/parsers/cr1000x_cr1000x.py b/utils/parsers/cr1000x_cr1000x.py index 44b949d..5ec47c0 100644 --- a/utils/parsers/cr1000x_cr1000x.py +++ b/utils/parsers/cr1000x_cr1000x.py @@ -1,3 +1,15 @@ -def main_loader(unit, tool): - print(f'{__name__}: {unit} - {tool}') - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/d2w_d2w.py b/utils/parsers/d2w_d2w.py index a57a82a..5ec47c0 100644 --- a/utils/parsers/d2w_d2w.py +++ b/utils/parsers/d2w_d2w.py @@ -1,3 +1,15 @@ -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/g201_g201.py b/utils/parsers/g201_g201.py index a57a82a..5ec47c0 100644 --- a/utils/parsers/g201_g201.py +++ b/utils/parsers/g201_g201.py @@ -1,3 +1,15 @@ -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/g301_g301.py b/utils/parsers/g301_g301.py index a57a82a..5ec47c0 100644 --- a/utils/parsers/g301_g301.py +++ b/utils/parsers/g301_g301.py @@ -1,3 +1,15 @@ -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/g801_mums.py b/utils/parsers/g801_mums.py index 3afa64b..0c96fcb 100644 --- a/utils/parsers/g801_mums.py +++ b/utils/parsers/g801_mums.py @@ -7,7 +7,7 @@ import logging logger = logging.getLogger(__name__) # Define the main function for loading data -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: # Create a matrix of values from the data matrice_valori = make_matrix(cfg, id) # Load the data into the database diff --git a/utils/parsers/g801_musa.py b/utils/parsers/g801_musa.py index 77dc2c8..5ec47c0 100644 --- a/utils/parsers/g801_musa.py +++ b/utils/parsers/g801_musa.py @@ -1,5 +1,15 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/g801_mux.py b/utils/parsers/g801_mux.py index 77dc2c8..5ec47c0 100644 --- a/utils/parsers/g801_mux.py +++ b/utils/parsers/g801_mux.py @@ -1,5 +1,15 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/g802_gd.py b/utils/parsers/g802_gd.py index 77dc2c8..5ec47c0 100644 --- a/utils/parsers/g802_gd.py +++ b/utils/parsers/g802_gd.py @@ -1,5 +1,15 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file diff --git a/utils/parsers/tlp_tlp.py b/utils/parsers/tlp_tlp.py index 77dc2c8..5ec47c0 100644 --- a/utils/parsers/tlp_tlp.py +++ b/utils/parsers/tlp_tlp.py @@ -1,5 +1,15 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +# Import necessary modules +from utils.database.loader_action import load_data, update_status, DATA_LOADED +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +# Define the main function for loading data +async def main_loader(cfg: object, id: int) -> None: + # Create a matrix of values from the data + matrice_valori = make_matrix(cfg, id) + # Load the data into the database + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) \ No newline at end of file