import asyncio import logging import os import tempfile from utils.csv.data_preparation import ( get_data, make_ain_din_matrix, make_channels_matrix, make_gd_matrix, make_musa_matrix, make_pipe_sep_matrix, make_tlp_matrix, ) from utils.database import WorkflowFlags from utils.database.loader_action import load_data, unlock, update_status logger = logging.getLogger(__name__) async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: """ Main loader function to process CSV data based on the specified action. Args: cfg (object): Configuration object. id (int): The ID of the CSV record to process. pool (object): The database connection pool. action (str): The type of data processing to perform (e.g., "pipe_separator", "analogic_digital"). """ type_matrix_mapping = { "pipe_separator": make_pipe_sep_matrix, "analogic_digital": make_ain_din_matrix, "channels": make_channels_matrix, "tlp": make_tlp_matrix, "gd": make_gd_matrix, "musa": make_musa_matrix, } if action in type_matrix_mapping: function_to_call = type_matrix_mapping[action] # Create a matrix of values from the data matrice_valori = await function_to_call(cfg, id, pool) logger.info("matrice valori creata") # Load the data into the database if await load_data(cfg, matrice_valori, pool, type=action): await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await unlock(cfg, id, pool) else: logger.warning(f"Action '{action}' non riconosciuta.") async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_status: int) -> tuple: """ Retrieves the next available CSV record for processing in an atomic manner. This function acquires a database connection from the pool, begins a transaction, and attempts to select and lock a single record from the specified table that matches the given status and has not yet reached the next_status. It uses `SELECT FOR UPDATE SKIP LOCKED` to ensure atomicity and prevent other workers from processing the same record concurrently. Args: pool (object): The database connection pool. table_name (str): The name of the table to query. status (int): The current status flag that the record must have. next_status (int): The status flag that the record should NOT have yet. Returns: tuple: The next available received record if found, otherwise None. """ async with pool.acquire() as conn: # IMPORTANTE: Disabilita autocommit per questa transazione await conn.begin() try: async with conn.cursor() as cur: # Usa SELECT FOR UPDATE per lock atomico await cur.execute( f""" SELECT id, unit_type, tool_type, unit_name, tool_name FROM {table_name} WHERE locked = 0 AND ((status & %s) > 0 OR %s = 0) AND (status & %s) = 0 ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED """, (status, status, next_status), ) result = await cur.fetchone() if result: await cur.execute( f""" UPDATE {table_name} SET locked = 1 WHERE id = %s """, (result[0],), ) # Commit esplicito per rilasciare il lock await conn.commit() return result except Exception as e: # Rollback in caso di errore await conn.rollback() raise e async def main_old_script_loader(cfg: object, id: int, pool: object, script_name: str) -> None: """ This function retrieves CSV data, writes it to a temporary file, executes an external Python script to process it, and then updates the workflow status in the database. Args: cfg (object): The configuration object. id (int): The ID of the CSV record to process. pool (object): The database connection pool. script_name (str): The name of the script to execute (without the .py extension). """ filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) # Creare un file temporaneo with tempfile.NamedTemporaryFile(mode="w", prefix=filename, suffix=".csv", delete=False) as temp_file: temp_file.write(ToolData) temp_filename = temp_file.name try: # Usa asyncio.subprocess per vero async process = await asyncio.create_subprocess_exec( "python3", f"old_scripts/{script_name}.py", temp_filename, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() result_stdout = stdout.decode("utf-8") result_stderr = stderr.decode("utf-8") finally: # Pulire il file temporaneo os.unlink(temp_filename) if process.returncode != 0: logger.error(f"Errore nell'esecuzione del programma {script_name}.py: {result_stderr}") raise Exception(f"Errore nel programma: {result_stderr}") else: logger.info(f"Programma {script_name}.py eseguito con successo.") logger.debug(f"Stdout: {result_stdout}") await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool) await unlock(cfg, id, pool)