altre fix
This commit is contained in:
@@ -6,11 +6,21 @@ import importlib
|
||||
import asyncio
|
||||
import os
|
||||
import aiomysql
|
||||
import contextvars
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_load_data as setting
|
||||
from utils.database import CSV_RECEIVED
|
||||
|
||||
# Crea una context variable per identificare il worker
|
||||
worker_context = contextvars.ContextVar('worker_id', default='00')
|
||||
|
||||
# Formatter personalizzato che include il worker_id
|
||||
class WorkerFormatter(logging.Formatter):
|
||||
def format(self, record):
|
||||
record.worker_id = worker_context.get()
|
||||
return super().format(record)
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger()
|
||||
|
||||
@@ -45,34 +55,36 @@ async def get_next_csv_atomic(pool, table_name):
|
||||
return result
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool) -> None:
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id}")
|
||||
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
logger.info(f"Worker {worker_id} - Avviato")
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info(f"Worker {worker_id} - Inizio elaborazione")
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable)
|
||||
|
||||
if record:
|
||||
success = await load_csv(record, cfg, worker_id, pool)
|
||||
success = await load_csv(record, cfg, pool)
|
||||
if not success:
|
||||
logger.error(f"Worker {worker_id} - Errore durante l'elaborazione")
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||
else:
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool:
|
||||
|
||||
async def load_csv(record: tuple, cfg: object, pool) -> bool:
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
logger.debug(f"Worker {worker_id} - Inizio ricerca nuovo CSV da elaborare")
|
||||
logger.debug("Inizio ricerca nuovo CSV da elaborare")
|
||||
|
||||
id, unit_type, tool_type, unit_name, tool_name = record
|
||||
logger.info(f"Worker {worker_id} - Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}")
|
||||
logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}")
|
||||
|
||||
# Costruisce il nome del modulo da caricare dinamicamente
|
||||
module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}',
|
||||
@@ -82,30 +94,26 @@ async def load_csv(record: tuple, cfg: object, worker_id: int, pool) -> bool:
|
||||
modulo = None
|
||||
for module_name in module_names:
|
||||
try:
|
||||
logger.debug(f"Worker {worker_id} - Caricamento dinamico del modulo: {module_name}")
|
||||
logger.debug(f"Caricamento dinamico del modulo: {module_name}")
|
||||
modulo = importlib.import_module(module_name)
|
||||
logger.debug(f"Worker {worker_id} - Funzione 'main_loader' caricata dal modulo {module_name}")
|
||||
logger.debug(f"Funzione 'main_loader' caricata dal modulo {module_name}")
|
||||
break
|
||||
except (ImportError, AttributeError) as e:
|
||||
logger.info(f"Worker {worker_id} - Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode)
|
||||
logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode)
|
||||
|
||||
if not modulo:
|
||||
logger.error(f"Worker {worker_id} - Nessun modulo trovato {module_names}")
|
||||
logger.error(f"Nessun modulo trovato {module_names}")
|
||||
return False
|
||||
|
||||
# Ottiene la funzione 'main_loader' dal modulo
|
||||
|
||||
funzione = getattr(modulo, "main_loader")
|
||||
|
||||
# Esegui la funzione
|
||||
|
||||
logger.info(f"Worker {worker_id} - Elaborazione con modulo {modulo} per ID={id}")
|
||||
logger.info(f"Elaborazione con modulo {modulo} per ID={id}")
|
||||
await funzione(cfg, id, pool)
|
||||
logger.info(f"Worker {worker_id} - Elaborazione completata per ID={id}")
|
||||
logger.info(f"Elaborazione completata per ID={id}")
|
||||
return True
|
||||
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main function: avvia i worker e gestisce il ciclo principale."""
|
||||
logger.info("Avvio del sistema...")
|
||||
@@ -118,13 +126,19 @@ async def main():
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
|
||||
filename=cfg.logfilename,
|
||||
level=log_level,
|
||||
# Configura il logging con il formatter personalizzato
|
||||
handler = logging.FileHandler(cfg.logfilename)
|
||||
formatter = WorkerFormatter(
|
||||
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
|
||||
)
|
||||
logger.info("Logging configurato correttamente")
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
# Rimuovi eventuali handler esistenti e aggiungi il nostro
|
||||
logger.handlers.clear()
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(getattr(logging, log_level))
|
||||
|
||||
logger.info("Logging configurato correttamente")
|
||||
|
||||
# Numero massimo di worker concorrenti
|
||||
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
|
||||
@@ -158,6 +172,5 @@ async def main():
|
||||
except Exception as e:
|
||||
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user