diff --git a/elab_orchestrator.py b/elab_orchestrator.py index 9a94208..f59b6a1 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -3,35 +3,13 @@ # Import necessary libraries import logging import asyncio -import os -import aiomysql -import contextvars - # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting from utils.database import DATA_LOADED from utils.database.matlab_query import get_matlab_command from utils.csv.loaders import get_next_csv_atomic - - -# Crea una context variable per identificare il worker -worker_context = contextvars.ContextVar('worker_id', default='^-^') - -# Formatter personalizzato che include il worker_id -class WorkerFormatter(logging.Formatter): - """Formatter personalizzato che include l'ID del worker nei log.""" - def format(self, record: str) -> str: - """Formatta il record di log includendo l'ID del worker. - - Args: - record (str): Il record di log da formattare. - - Returns: - La stringa formattata del record di log. - """ - record.worker_id = worker_context.get() - return super().format(record) +from utils.orchestrator_utils import run_orchestrator, worker_context # Initialize the logger for this module logger = logging.getLogger() @@ -56,7 +34,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: # Imposta il context per questo worker worker_context.set(f"W{worker_id:02d}") - debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.info("Avviato") while True: @@ -97,71 +75,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: async def main(): - """Funzione principale che inizializza e avvia il sistema di elaborazione. - - Questa funzione si occupa di: - - Caricare la configurazione. - - Impostare il logging. - - Creare un pool di connessioni al database. - - Avviare i worker concorrenti per l'elaborazione. - - Gestire l'arresto controllato del sistema. - """ - logger.info("Avvio del sistema...") - - cfg = setting.Config() - logger.info("Configurazione caricata correttamente") - - try: - # Configura il logging globale - log_level = os.getenv("LOG_LEVEL", "INFO").upper() - debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) - - # Configura il logging con il formatter personalizzato - handler = logging.FileHandler(cfg.logfilename) - formatter = WorkerFormatter( - "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" - ) - handler.setFormatter(formatter) - - # Rimuovi eventuali handler esistenti e aggiungi il nostro - logger.handlers.clear() - logger.addHandler(handler) - logger.setLevel(getattr(logging, log_level)) - - logger.info("Logging configurato correttamente") - - # Numero massimo di worker concorrenti - logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") - - pool = await aiomysql.create_pool( - host=cfg.dbhost, - user=cfg.dbuser, - password=cfg.dbpass, - db=cfg.dbname, - minsize=cfg.max_threads, - maxsize=cfg.max_threads*4, - pool_recycle=3600 - ) - - # Avvia i worker - workers = [ - asyncio.create_task(worker(i, cfg, pool)) - for i in range(cfg.max_threads) - ] - - logger.info("Sistema avviato correttamente. In attesa di nuovi task...") - - try: - await asyncio.gather(*workers, return_exceptions=debug_mode) - finally: - pool.close() - await pool.wait_closed() - - except KeyboardInterrupt: - logger.info("Info: Shutdown richiesto... chiusura in corso") - - except Exception as e: - logger.error(f"Errore principale: {e}", exc_info=debug_mode) + """Funzione principale che avvia l'elab_orchestrator.""" + await run_orchestrator(setting.Config, worker) if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file diff --git a/load_orchestrator.py b/load_orchestrator.py index 46efaba..2c07782 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -4,36 +4,12 @@ import logging import importlib import asyncio -import os -import aiomysql -import contextvars # Import custom modules for configuration and database connection from utils.config import loader_load_data as setting from utils.database import CSV_RECEIVED - from utils.csv.loaders import get_next_csv_atomic - -# Crea una context variable per identificare il worker -worker_context = contextvars.ContextVar("worker_id", default="^-^") - - -# Formatter personalizzato che include il worker_id -class WorkerFormatter(logging.Formatter): - """Formatter personalizzato per i log che include l'ID del worker.""" - - def format(self, record: str) -> str: - """Formatta il record di log includendo l'ID del worker. - - Args: - record (str): Il record di log da formattare. - - Returns: - La stringa formattata del record di log. - """ - record.worker_id = worker_context.get() - return super().format(record) - +from utils.orchestrator_utils import run_orchestrator, worker_context # Initialize the logger for this module logger = logging.getLogger() @@ -43,7 +19,6 @@ CSV_PROCESSING_DELAY = 0.2 # Tempo di attesa se non ci sono record da elaborare NO_RECORD_SLEEP = 60 - async def worker(worker_id: int, cfg: object, pool: object) -> None: """Esegue il ciclo di lavoro per l'elaborazione dei file CSV. @@ -137,70 +112,8 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool: async def main(): - """Funzione principale che inizializza e avvia il sistema. - - Questa funzione si occupa di: - - Caricare la configurazione. - - Impostare il logging. - - Creare un pool di connessioni al database. - - Avviare i worker concorrenti per l'elaborazione dei CSV. - - Gestire l'arresto controllato del sistema. - """ - logger.info("Avvio del sistema...") - - cfg = setting.Config() - logger.info("Configurazione caricata correttamente") - - try: - # Configura il logging globale - log_level = os.getenv("LOG_LEVEL", "INFO").upper() - debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG - - # Configura il logging con il formatter personalizzato - handler = logging.FileHandler(cfg.logfilename) - formatter = WorkerFormatter( - "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" - ) - handler.setFormatter(formatter) - - # Rimuovi eventuali handler esistenti e aggiungi il nostro - logger.handlers.clear() - logger.addHandler(handler) - logger.setLevel(getattr(logging, log_level)) - - logger.info("Logging configurato correttamente") - - # Numero massimo di worker concorrenti - logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") - - pool = await aiomysql.create_pool( - host=cfg.dbhost, - user=cfg.dbuser, - password=cfg.dbpass, - db=cfg.dbname, - minsize=cfg.max_threads, - maxsize=cfg.max_threads * 4, - pool_recycle=3600, - ) - - # Avvia i worker - workers = [ - asyncio.create_task(worker(i, cfg, pool)) for i in range(cfg.max_threads) - ] - - logger.info("Sistema avviato correttamente. In attesa di nuovi task...") - - try: - await asyncio.gather(*workers, return_exceptions=debug_mode) - finally: - pool.close() - await pool.wait_closed() - - except KeyboardInterrupt: - logger.info("Info: Shutdown richiesto... chiusura in corso") - - except Exception as e: - logger.error(f"Errore principale: {e}", exc_info=debug_mode) + """Funzione principale che avvia il load_orchestrator.""" + await run_orchestrator(setting.Config, worker) if __name__ == "__main__": diff --git a/utils/orchestrator_utils.py b/utils/orchestrator_utils.py new file mode 100644 index 0000000..c4255bf --- /dev/null +++ b/utils/orchestrator_utils.py @@ -0,0 +1,104 @@ +import logging +import asyncio +import os +import aiomysql +import contextvars +from typing import Callable, Coroutine, Any + +# Crea una context variable per identificare il worker +worker_context = contextvars.ContextVar("worker_id", default="^-^") + + +# Formatter personalizzato che include il worker_id +class WorkerFormatter(logging.Formatter): + """Formatter personalizzato per i log che include l'ID del worker.""" + + def format(self, record: logging.LogRecord) -> str: + """Formatta il record di log includendo l'ID del worker. + + Args: + record (str): Il record di log da formattare. + + Returns: + La stringa formattata del record di log. + """ + record.worker_id = worker_context.get() + return super().format(record) + + +def setup_logging(log_filename: str, log_level_str: str): + """Configura il logging globale. + + Args: + log_filename (str): Percorso del file di log. + log_level_str (str): Livello di log (es. "INFO", "DEBUG"). + """ + logger = logging.getLogger() + handler = logging.FileHandler(log_filename) + formatter = WorkerFormatter( + "%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s" + ) + handler.setFormatter(formatter) + + # Rimuovi eventuali handler esistenti e aggiungi il nostro + if logger.hasHandlers(): + logger.handlers.clear() + logger.addHandler(handler) + log_level = getattr(logging, log_level_str.upper(), logging.INFO) + logger.setLevel(log_level) + logger.info("Logging configurato correttamente") + + +async def run_orchestrator( + config_class: Any, + worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]], +): + """Funzione principale che inizializza e avvia un orchestratore. + + Args: + config_class: La classe di configurazione da istanziare. + worker_coro: La coroutine del worker da eseguire in parallelo. + """ + logger = logging.getLogger() + logger.info("Avvio del sistema...") + + cfg = config_class() + logger.info("Configurazione caricata correttamente") + + debug_mode = False + try: + log_level = os.getenv("LOG_LEVEL", "INFO").upper() + setup_logging(cfg.logfilename, log_level) + debug_mode = logger.getEffectiveLevel() == logging.DEBUG + + logger.info(f"Avvio di {cfg.max_threads} worker concorrenti") + + pool = await aiomysql.create_pool( + host=cfg.dbhost, + user=cfg.dbuser, + password=cfg.dbpass, + db=cfg.dbname, + minsize=cfg.max_threads, + maxsize=cfg.max_threads * 4, + pool_recycle=3600, + ) + + tasks = [ + asyncio.create_task(worker_coro(i, cfg, pool)) + for i in range(cfg.max_threads) + ] + + + logger.info("Sistema avviato correttamente. In attesa di nuovi task...") + + try: + await asyncio.gather(*tasks, return_exceptions=debug_mode) + finally: + pool.close() + await pool.wait_closed() + + except KeyboardInterrupt: + logger.info("Info: Shutdown richiesto... chiusura in corso") + + except Exception as e: + logger.error(f"Errore principale: {e}", exc_info=debug_mode) \ No newline at end of file