import logging import asyncio import os import aiomysql import contextvars from typing import Callable, Coroutine, Any # Crea una context variable per identificare il worker worker_context = contextvars.ContextVar("worker_id", default="^-^") # Formatter personalizzato che include il worker_id class WorkerFormatter(logging.Formatter): """Formatter personalizzato per i log che include l'ID del worker.""" def format(self, record: logging.LogRecord) -> str: """Formatta il record di log includendo l'ID del worker. Args: record (str): Il record di log da formattare. Returns: La stringa formattata del record di log. """ record.worker_id = worker_context.get() return super().format(record) def setup_logging(log_filename: str, log_level_str: str): """Configura il logging globale. Args: log_filename (str): Percorso del file di log. log_level_str (str): Livello di log (es. "INFO", "DEBUG"). """ logger = logging.getLogger() handler = logging.FileHandler(log_filename) formatter = WorkerFormatter( "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" ) handler.setFormatter(formatter) # Rimuovi eventuali handler esistenti e aggiungi il nostro if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(handler) log_level = getattr(logging, log_level_str.upper(), logging.INFO) logger.setLevel(log_level) logger.info("Logging configurato correttamente") async def run_orchestrator( config_class: Any, worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]], ): """Funzione principale che inizializza e avvia un orchestratore. Args: config_class: La classe di configurazione da istanziare. worker_coro: La coroutine del worker da eseguire in parallelo. """ logger = logging.getLogger() logger.info("Avvio del sistema...") cfg = config_class() logger.info("Configurazione caricata correttamente") debug_mode = False try: log_level = os.getenv("LOG_LEVEL", "INFO").upper() setup_logging(cfg.logfilename, log_level) debug_mode = logger.getEffectiveLevel() == logging.DEBUG logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") pool = await aiomysql.create_pool( host=cfg.dbhost, user=cfg.dbuser, password=cfg.dbpass, db=cfg.dbname, minsize=cfg.max_threads, maxsize=cfg.max_threads * 4, pool_recycle=3600, ) tasks = [ asyncio.create_task(worker_coro(i, cfg, pool)) for i in range(cfg.max_threads) ] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") try: await asyncio.gather(*tasks, return_exceptions=debug_mode) finally: pool.close() await pool.wait_closed() except KeyboardInterrupt: logger.info("Info: Shutdown richiesto... chiusura in corso") except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode)