#!.venv/bin/python # Import necessary libraries import mysql.connector import logging import importlib import asyncio import os # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting from utils.database.connection import connetti_db from utils.database import CSV_RECEIVED # Initialize the logger for this module logger = logging.getLogger() # Delay tra un processamento CSV e il successivo (in secondi) CSV_PROCESSING_DELAY = 0.1 # Tempo di attesa se non ci sono record da elaborare NO_RECORD_SLEEP = 20 async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: """ Worker asyncrono che preleva lavori dalla coda e li esegue. Args: worker_id (int): ID univoco del worker. queue (asyncio.Queue): Coda da cui prendere i lavori. cfg (object): Configurazione caricata. """ debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.info(f"Worker {worker_id} - Avviato") while True: try: # Preleva un "lavoro" dalla coda (in questo caso non ci sono parametri) await queue.get() logger.info(f"Worker {worker_id} - Inizio elaborazione") record, success = await load_csv(cfg) if not record: logger.debug(f"Worker {worker_id} - Nessun record trovato") await asyncio.sleep(NO_RECORD_SLEEP) if not success: logger.error(f"Worker {worker_id} - Errore durante l'elaborazione") await asyncio.sleep(CSV_PROCESSING_DELAY) else: logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") await asyncio.sleep(CSV_PROCESSING_DELAY) # Segnala che il lavoro è completato queue.task_done() except Exception as e: logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode) queue.task_done() async def load_csv(cfg: object) -> tuple: """ Cerca e carica un file CSV da elaborare dal database. Args: cfg (object): Oggetto configurazione contenente dati per DB e altro. Returns: bool: True se è stato trovato ed elaborato un record, False altrimenti. """ debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.debug("Inizio ricerca nuovo CSV da elaborare") try: with connetti_db(cfg) as conn: cur = conn.cursor() logger.debug("Connessione al database stabilita") query = f""" SELECT id, unit_type, tool_type, unit_name, tool_name FROM {cfg.dbname}.{cfg.dbrectable} WHERE locked = 0 AND status = {CSV_RECEIVED} LIMIT 1 """ logger.debug(f"Esecuzione query: {query}") cur.execute(query) result = cur.fetchone() if result: id, unit_type, tool_type, unit_name, tool_name = result logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") lock_query = f"UPDATE {cfg.dbname}.{cfg.dbrectable} SET locked = 1 WHERE id = {id}" logger.debug(f"Lock del record: {lock_query}") cur.execute(lock_query) conn.commit() # Costruisce il nome del modulo da caricare dinamicamente module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}', f'utils.parsers.by_name.{unit_name.lower()}_all', f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}'] modulo = None for module_name in module_names: try: logger.debug(f"Caricamento dinamico del modulo: {module_name}") modulo = importlib.import_module(module_name) logger.debug(f"Funzione 'main_loader' caricata dal modulo {module_name}") except (ImportError, AttributeError) as e: logger.warning(f"Modulo {module_name} non trovato: {e}", exc_info=debug_mode) if not modulo: logger.error(f"Nessun modulo trovato {module_names}") return True, False # Ottiene la funzione 'main_loader' dal modulo funzione = getattr(modulo, "main_loader") # Esegui la funzione await funzione(cfg, id) logger.info(f"Elaborazione completata per ID={id}") return True, True else: logger.debug("Nessun record disponibile per l'elaborazione") return False, False except mysql.connector.Error as e: logger.error(f"Errore di database: {e}", exc_info=debug_mode) return False, False async def main(): """Main function: avvia i worker e gestisce il ciclo principale.""" logger.info("Avvio del sistema...") cfg = setting.Config() logger.info("Configurazione caricata correttamente") try: # Configura il logging globale log_level = os.getenv("LOG_LEVEL", "INFO").upper() debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, level=log_level, ) logger.info("Logging configurato correttamente") # Crea una coda di lavoro illimitata queue = asyncio.Queue(maxsize=cfg.max_threads * 2 or 20) logger.debug("Coda di lavoro creata") # Numero massimo di worker concorrenti num_workers = cfg.max_threads logger.info(f"Avvio di {num_workers} worker concorrenti") # Avvia i worker workers = [ asyncio.create_task(worker(i, queue, cfg)) for i in range(num_workers) ] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") # Ciclo infinito per aggiungere lavori alla coda while True: logger.debug("Aggiunta di un nuovo lavoro alla coda") await queue.put(None) # Breve attesa prima di aggiungere un altro lavoro await asyncio.sleep(0.5) except KeyboardInterrupt: logger.info("Info: Shutdown richiesto... chiusura in corso") # Attendi che tutti i lavori pendenti siano completati logger.info("Attesa completamento dei task in coda...") await queue.join() # Ferma i worker logger.info("Chiusura dei worker in corso...") for task in workers: task.cancel() await asyncio.gather(*workers, return_exceptions=debug_mode) logger.info("Info: Tutti i task terminati. Uscita.") except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode) if __name__ == "__main__": asyncio.run(main())