from utils.database.loader_action import load_data, update_status, unlock from utils.database import WorkflowFlags from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix import logging logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: """ Main loader function to process CSV data based on the specified action. Args: cfg (object): Configuration object. id (int): The ID of the CSV record to process. pool (object): The database connection pool. action (str): The type of data processing to perform (e.g., "pipe_separator", "analogic_digital"). """ type_matrix_mapping = { "pipe_separator": make_pipe_sep_matrix, "analogic_digital": make_ain_din_matrix, "channels": make_channels_matrix, "tlp": make_tlp_matrix, "gd": make_gd_matrix, "musa": make_musa_matrix } if action in type_matrix_mapping: function_to_call = type_matrix_mapping[action] # Create a matrix of values from the data matrice_valori = await function_to_call(cfg, id, pool) logger.info("matrice valori creata") # Load the data into the database if await load_data(cfg, matrice_valori, pool): await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await unlock(cfg, id, pool) else: logger.warning(f"Action '{action}' non riconosciuta.") async def get_next_csv_atomic(pool, table_name, status, next_status): """Preleva atomicamente il prossimo CSV da elaborare""" async with pool.acquire() as conn: # IMPORTANTE: Disabilita autocommit per questa transazione await conn.begin() try: async with conn.cursor() as cur: # Usa SELECT FOR UPDATE per lock atomico await cur.execute(f""" SELECT id, unit_type, tool_type, unit_name, tool_name FROM {table_name} WHERE locked = 0 AND ((status & %s) > 0 OR %s = 0) AND (status & %s) = 0 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED """, (status, status, next_status)) result = await cur.fetchone() if result: await cur.execute(f""" UPDATE {table_name} SET locked = 1 WHERE id = %s """, (result[0],)) # Commit esplicito per rilasciare il lock await conn.commit() return result except Exception as e: # Rollback in caso di errore await conn.rollback() raise e