elab matlab
This commit is contained in:
@@ -12,6 +12,8 @@ import contextvars
|
||||
from utils.config import loader_load_data as setting
|
||||
from utils.database import CSV_RECEIVED
|
||||
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
|
||||
# Crea una context variable per identificare il worker
|
||||
worker_context = contextvars.ContextVar('worker_id', default='00')
|
||||
|
||||
@@ -29,41 +31,6 @@ CSV_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def get_next_csv_atomic(pool, table_name):
|
||||
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
||||
async with pool.acquire() as conn:
|
||||
# IMPORTANTE: Disabilita autocommit per questa transazione
|
||||
await conn.begin()
|
||||
|
||||
try:
|
||||
async with conn.cursor() as cur:
|
||||
# Usa SELECT FOR UPDATE per lock atomico
|
||||
await cur.execute(f"""
|
||||
SELECT id, unit_type, tool_type, unit_name, tool_name
|
||||
FROM {table_name}
|
||||
WHERE locked = 0 AND status = %s
|
||||
ORDER BY id
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""", (CSV_RECEIVED,))
|
||||
|
||||
result = await cur.fetchone()
|
||||
if result:
|
||||
await cur.execute(f"""
|
||||
UPDATE {table_name}
|
||||
SET locked = 1
|
||||
WHERE id = %s
|
||||
""", (result[0],))
|
||||
|
||||
# Commit esplicito per rilasciare il lock
|
||||
await conn.commit()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Rollback in caso di errore
|
||||
await conn.rollback()
|
||||
raise e
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool) -> None:
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id}")
|
||||
@@ -75,7 +42,7 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable)
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, CSV_RECEIVED)
|
||||
|
||||
if record:
|
||||
success = await load_csv(record, cfg, pool)
|
||||
|
||||
Reference in New Issue
Block a user