rimoso autocommit nei pool

This commit is contained in:
2025-06-13 08:34:59 +02:00
parent 40a1ac23ee
commit 726d04ace3

View File

@@ -32,27 +32,37 @@ NO_RECORD_SLEEP = 60
async def get_next_csv_atomic(pool, table_name): async def get_next_csv_atomic(pool, table_name):
"""Preleva atomicamente il prossimo CSV da elaborare""" """Preleva atomicamente il prossimo CSV da elaborare"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: # IMPORTANTE: Disabilita autocommit per questa transazione
# Usa SELECT FOR UPDATE per lock atomico await conn.begin()
await cur.execute(f"""
SELECT id, unit_type, tool_type, unit_name, tool_name
FROM {table_name}
WHERE locked = 0 AND status = %s
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (CSV_RECEIVED,))
result = await cur.fetchone() try:
if result: async with conn.cursor() as cur:
# Usa SELECT FOR UPDATE per lock atomico
await cur.execute(f""" await cur.execute(f"""
UPDATE {table_name} SELECT id, unit_type, tool_type, unit_name, tool_name
SET locked = 1 FROM {table_name}
WHERE id = %s WHERE locked = 0 AND status = %s
""", (result[0],)) ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (CSV_RECEIVED,))
await conn.commit() result = await cur.fetchone()
return result if result:
await cur.execute(f"""
UPDATE {table_name}
SET locked = 1
WHERE id = %s
""", (result[0],))
# Commit esplicito per rilasciare il lock
await conn.commit()
return result
except Exception as e:
# Rollback in caso di errore
await conn.rollback()
raise e
async def worker(worker_id: int, cfg: object, pool) -> None: async def worker(worker_id: int, cfg: object, pool) -> None:
# Imposta il context per questo worker # Imposta il context per questo worker
@@ -73,6 +83,7 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
logger.error("Errore durante l'elaborazione") logger.error("Errore durante l'elaborazione")
await asyncio.sleep(CSV_PROCESSING_DELAY) await asyncio.sleep(CSV_PROCESSING_DELAY)
else: else:
logger.debug("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: except Exception as e:
@@ -84,13 +95,13 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
logger.debug("Inizio ricerca nuovo CSV da elaborare") logger.debug("Inizio ricerca nuovo CSV da elaborare")
id, unit_type, tool_type, unit_name, tool_name = record id, unit_type, tool_type, unit_name, tool_name = record
logger.info(f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}") logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type.lower().replace(" ", "_")}_{tool_type.lower().replace(" ", "_")}, Nome={unit_name.lower().replace(" ", "_")}_{tool_name.lower().replace(" ", "_")}')
# Costruisce il nome del modulo da caricare dinamicamente # Costruisce il nome del modulo da caricare dinamicamente
module_names = [f'utils.parsers.by_name.{unit_name.lower()}_{tool_name.lower()}', module_names = [f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_{tool_name.lower().replace(" ","_")}',
f'utils.parsers.by_name.{unit_name.lower()}_{tool_type.lower()}', f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_{tool_type.lower().replace(" ","_")}',
f'utils.parsers.by_name.{unit_name.lower()}_all', f'utils.parsers.by_name.{unit_name.lower().replace(" ", "_")}_all',
f'utils.parsers.by_type.{unit_type.lower()}_{tool_type.lower()}'] f'utils.parsers.by_type.{unit_type.lower().replace(" ", "_")}_{tool_type.lower().replace(" ","_")}']
modulo = None modulo = None
for module_name in module_names: for module_name in module_names:
try: try:
@@ -150,8 +161,7 @@ async def main():
db=cfg.dbname, db=cfg.dbname,
minsize=1, minsize=1,
maxsize=cfg.max_threads*4, maxsize=cfg.max_threads*4,
pool_recycle=3600, pool_recycle=3600
autocommit=True
) )
# Avvia i worker # Avvia i worker