import asyncio import contextvars import logging import os import signal from collections.abc import Callable, Coroutine from logging.handlers import RotatingFileHandler from typing import Any import aiomysql # Crea una context variable per identificare il worker worker_context = contextvars.ContextVar("worker_id", default="^-^") # Global shutdown event shutdown_event = asyncio.Event() # Formatter personalizzato che include il worker_id class WorkerFormatter(logging.Formatter): """Formatter personalizzato per i log che include l'ID del worker.""" def format(self, record: logging.LogRecord) -> str: """Formatta il record di log includendo l'ID del worker. Args: record (str): Il record di log da formattare. Returns: La stringa formattata del record di log. """ record.worker_id = worker_context.get() return super().format(record) def setup_logging(log_filename: str, log_level_str: str): """Configura il logging globale con rotation automatica. Args: log_filename (str): Percorso del file di log. log_level_str (str): Livello di log (es. "INFO", "DEBUG"). """ logger = logging.getLogger() formatter = WorkerFormatter("%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s") # Rimuovi eventuali handler esistenti if logger.hasHandlers(): logger.handlers.clear() # Handler per file con rotation (max 10MB per file, mantiene 5 backup) file_handler = RotatingFileHandler( log_filename, maxBytes=10 * 1024 * 1024, # 10 MB backupCount=5, # Mantiene 5 file di backup encoding="utf-8" ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Handler per console (utile per Docker) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) log_level = getattr(logging, log_level_str.upper(), logging.INFO) logger.setLevel(log_level) logger.info("Logging configurato correttamente con rotation (10MB, 5 backup)") def setup_signal_handlers(logger: logging.Logger): """Setup signal handlers for graceful shutdown. Handles both SIGTERM (from systemd/docker) and SIGINT (Ctrl+C). Args: logger: Logger instance for logging shutdown events. """ def signal_handler(signum, frame): """Handle shutdown signals.""" sig_name = signal.Signals(signum).name logger.info(f"Ricevuto segnale {sig_name} ({signum}). Avvio shutdown graceful...") shutdown_event.set() # Register handlers for graceful shutdown signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.info("Signal handlers configurati (SIGTERM, SIGINT)") async def run_orchestrator( config_class: Any, worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]], ): """Funzione principale che inizializza e avvia un orchestratore. Gestisce graceful shutdown su SIGTERM e SIGINT, permettendo ai worker di completare le operazioni in corso prima di terminare. Args: config_class: La classe di configurazione da istanziare. worker_coro: La coroutine del worker da eseguire in parallelo. """ logger = logging.getLogger() logger.info("Avvio del sistema...") cfg = config_class() logger.info("Configurazione caricata correttamente") debug_mode = False pool = None try: log_level = os.getenv("LOG_LEVEL", "INFO").upper() setup_logging(cfg.logfilename, log_level) debug_mode = logger.getEffectiveLevel() == logging.DEBUG # Setup signal handlers for graceful shutdown setup_signal_handlers(logger) logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") pool = await aiomysql.create_pool( host=cfg.dbhost, user=cfg.dbuser, password=cfg.dbpass, db=cfg.dbname, minsize=cfg.max_threads, maxsize=cfg.max_threads * 2, # Optimized: 2x instead of 4x (more efficient) pool_recycle=3600, # Note: aiomysql doesn't support pool_pre_ping like SQLAlchemy # Connection validity is checked via pool_recycle ) tasks = [asyncio.create_task(worker_coro(i, cfg, pool)) for i in range(cfg.max_threads)] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") # Wait for either tasks to complete or shutdown signal shutdown_task = asyncio.create_task(shutdown_event.wait()) done, pending = await asyncio.wait( [shutdown_task, *tasks], return_when=asyncio.FIRST_COMPLETED ) if shutdown_event.is_set(): logger.info("Shutdown event rilevato. Cancellazione worker in corso...") # Cancel all pending tasks for task in pending: if not task.done(): task.cancel() # Wait for tasks to finish with timeout if pending: logger.info(f"In attesa della terminazione di {len(pending)} worker...") try: await asyncio.wait_for( asyncio.gather(*pending, return_exceptions=True), timeout=30.0, # Grace period for workers to finish ) logger.info("Tutti i worker terminati correttamente") except TimeoutError: logger.warning("Timeout raggiunto. Alcuni worker potrebbero non essere terminati correttamente") except KeyboardInterrupt: logger.info("Info: Shutdown richiesto da KeyboardInterrupt... chiusura in corso") except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode) finally: # Always cleanup pool if pool: logger.info("Chiusura pool di connessioni database...") pool.close() await pool.wait_closed() logger.info("Pool database chiuso correttamente") logger.info("Shutdown completato")