From 726d04ace37af4c27f83362d80af6ff46e61d72b Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 13 Jun 2025 08:34:59 +0200 Subject: [PATCH] rimoso autocommit nei pool --- load_orchestrator.py | 60 ++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/load_orchestrator.py b/load_orchestrator.py index 288a8cb..c8e00a4 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -32,27 +32,37 @@ NO_RECORD_SLEEP = 60 async def get_next_csv_atomic(pool, table_name): """Preleva atomicamente il prossimo CSV da elaborare""" async with pool.acquire() as conn: - async with conn.cursor() as cur: - # Usa SELECT FOR UPDATE per lock atomico - await cur.execute(f""" - SELECT id, unit_type, tool_type, unit_name, tool_name - FROM {table_name} - WHERE locked = 0 AND status = %s - ORDER BY id - LIMIT 1 - FOR UPDATE SKIP LOCKED - """, (CSV_RECEIVED,)) + # IMPORTANTE: Disabilita autocommit per questa transazione + await conn.begin() - result = await cur.fetchone() - if result: + try: + async with conn.cursor() as cur: + # Usa SELECT FOR UPDATE per lock atomico await cur.execute(f""" - UPDATE {table_name} - SET locked = 1 - WHERE id = %s - """, (result[0],)) + SELECT id, unit_type, tool_type, unit_name, tool_name + FROM {table_name} + WHERE locked = 0 AND status = %s + ORDER BY id + LIMIT 1 + FOR UPDATE SKIP LOCKED + """, (CSV_RECEIVED,)) - await conn.commit() - return result + result = await cur.fetchone() + if result: + await cur.execute(f""" + UPDATE {table_name} + SET locked = 1 + WHERE id = %s + """, (result[0],)) + + # Commit esplicito per rilasciare il lock + await conn.commit() + return result + + except Exception as e: + # Rollback in caso di errore + await conn.rollback() + raise e async def worker(worker_id: int, cfg: object, pool) -> None: # Imposta il context per questo worker @@ -73,6 +83,7 @@ async def worker(worker_id: int, cfg: object, pool) -> None: logger.error("Errore durante l'elaborazione") await asyncio.sleep(CSV_PROCESSING_DELAY) else: + logger.debug("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: @@ -84,13 +95,13 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool: logger.debug("Inizio ricerca nuovo CSV da elaborare") id, unit_type, tool_type, unit_name, tool_name = record - logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") + logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type.lower().replace(" ", "_")}_{tool_type.lower().replace(" ", "_")}, Nome={unit_name.lower().replace(" ", "_")}_{tool_name.lower().replace(" ", "_")}') # Costruisce il nome del modulo da caricare dinamicamente - module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', - f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}', - f'utils.parsers.by_name.{unit_name.lower()}_all', - f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}'] + module_names = [f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_{tool_name.lower().replace(" ","_")}', + f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_{tool_type.lower().replace(" ","_")}', + f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_all', + f'utils.parsers.by_type.{unit_type.lower().replace(" ", "_")}_{tool_type.lower().replace(" ","_")}'] modulo = None for module_name in module_names: try: @@ -150,8 +161,7 @@ async def main(): db=cfg.dbname, minsize=1, maxsize=cfg.max_threads*4, - pool_recycle=3600, - autocommit=True + pool_recycle=3600 ) # Avvia i worker