From 8cd5a21275dfcc7388fc3bc3feff167ad8c262c4 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 15 Sep 2025 22:06:01 +0200 Subject: [PATCH] fix flag elab --- src/elab_orchestrator.py | 100 +++++++++++++++-------------- src/ftp_csv_receiver.py | 2 +- src/load_orchestrator.py | 33 ++++------ src/send_orchestrator.py | 4 +- src/utils/database/action_query.py | 2 +- 5 files changed, 70 insertions(+), 71 deletions(-) diff --git a/src/elab_orchestrator.py b/src/elab_orchestrator.py index 478c8f9..b66b0d4 100755 --- a/src/elab_orchestrator.py +++ b/src/elab_orchestrator.py @@ -10,7 +10,7 @@ import asyncio # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting from utils.database import WorkflowFlags -from utils.database.action_query import get_tool_info +from utils.database.action_query import get_tool_info, check_flag_elab from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context from utils.database.loader_action import update_status, unlock @@ -47,61 +47,65 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: while True: try: logger.info("Inizio elaborazione") - record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) - if record: - rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? - tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool) - if tool_elab_info: - if tool_elab_info['statustools'].lower() in cfg.elab_status: - logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name) - await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool) - matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" - proc = await asyncio.create_subprocess_shell( - matlab_cmd, - cwd=cfg.matlab_func_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + if not await check_flag_elab(pool): + record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) + if record: + rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] + if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? + tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool) + if tool_elab_info: + if tool_elab_info['statustools'].lower() in cfg.elab_status: + logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name) + await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool) + matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" + proc = await asyncio.create_subprocess_shell( + matlab_cmd, + cwd=cfg.matlab_func_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - stdout, stderr = await proc.communicate() + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + logger.error("Errore durante l'elaborazione") + logger.error(stderr.decode().strip()) + + if proc.returncode == 124: + error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds." + else: + error_type = f"Matlab elab failed: {proc.returncode}." + + # da verificare i log dove prenderli + # with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f: + # f.write(stderr.decode().strip()) + # errors = [line for line in stderr.decode().strip() if line.startswith("Error")] + # warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")] + + errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt") + await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings) - if proc.returncode != 0: - logger.error("Errore durante l'elaborazione") - logger.error(stderr.decode().strip()) - if proc.returncode == 124: - error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds." else: - error_type = f"Matlab elab failed: {proc.returncode}." - - # da verificare i log dove prenderli - # with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f: - # f.write(stderr.decode().strip()) - # errors = [line for line in stderr.decode().strip() if line.startswith("Error")] - # warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")] - - errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt") - await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings) - - + logger.info(stdout.decode().strip()) + await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) + await unlock(cfg, rec_id, pool) + await asyncio.sleep(ELAB_PROCESSING_DELAY) else: - logger.info(stdout.decode().strip()) + logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools']) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) - await unlock(cfg, rec_id, pool) - await asyncio.sleep(ELAB_PROCESSING_DELAY) - else: - logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools']) - await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) - await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) - await unlock(cfg, rec_id, pool) - else: - await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) - await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) - await unlock(cfg, rec_id, pool) + await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) + await unlock(cfg, rec_id, pool) + else: + await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) + await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) + await unlock(cfg, rec_id, pool) + else: + logger.info("Nessun record disponibile") + await asyncio.sleep(NO_RECORD_SLEEP) else: - logger.info("Nessun record disponibile") + logger.info("Flag fermo elaborazione attivato") await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: # pylint: disable=broad-except diff --git a/src/ftp_csv_receiver.py b/src/ftp_csv_receiver.py index d22538e..98ccf1d 100755 --- a/src/ftp_csv_receiver.py +++ b/src/ftp_csv_receiver.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class DummySha256Authorizer(DummyAuthorizer): """Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" - def __init__(self: object, cfg: object) -> None: + def __init__(self: object, cfg: dict) -> None: """Initializes the authorizer, adds the admin user, and loads users from the database. Args: diff --git a/src/load_orchestrator.py b/src/load_orchestrator.py index 1e1e3ad..6e34f9a 100755 --- a/src/load_orchestrator.py +++ b/src/load_orchestrator.py @@ -12,7 +12,6 @@ import asyncio from utils.config import loader_load_data as setting from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic -from utils.database.action_query import check_flag_elab from utils.orchestrator_utils import run_orchestrator, worker_context # Initialize the logger for this module @@ -24,7 +23,7 @@ CSV_PROCESSING_DELAY = 0.2 NO_RECORD_SLEEP = 60 -async def worker(worker_id: int, cfg: object, pool: object) -> None: +async def worker(worker_id: int, cfg: dict, pool: object) -> None: """Esegue il ciclo di lavoro per l'elaborazione dei file CSV. Il worker preleva un record CSV dal database, ne elabora il contenuto @@ -32,7 +31,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: Args: worker_id (int): L'ID univoco del worker. - cfg (object): L'oggetto di configurazione. + cfg (dict): L'oggetto di configurazione. pool (object): Il pool di connessioni al database. """ # Imposta il context per questo worker @@ -43,24 +42,20 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: while True: try: logger.info("Inizio elaborazione") - if not await check_flag_elab(): - record = await get_next_csv_atomic( - pool, - cfg.dbrectable, - WorkflowFlags.CSV_RECEIVED, - WorkflowFlags.DATA_LOADED, - ) + record = await get_next_csv_atomic( + pool, + cfg.dbrectable, + WorkflowFlags.CSV_RECEIVED, + WorkflowFlags.DATA_LOADED, + ) - if record: - success = await load_csv(record, cfg, pool) - if not success: - logger.error("Errore durante l'elaborazione") - await asyncio.sleep(CSV_PROCESSING_DELAY) - else: - logger.info("Nessun record disponibile") - await asyncio.sleep(NO_RECORD_SLEEP) + if record: + success = await load_csv(record, cfg, pool) + if not success: + logger.error("Errore durante l'elaborazione") + await asyncio.sleep(CSV_PROCESSING_DELAY) else: - logger.info("Flag fermo elaborazione attivato") + logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: # pylint: disable=broad-except diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index fdd7ca1..2d86739 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -28,7 +28,7 @@ ELAB_PROCESSING_DELAY = 0.2 NO_RECORD_SLEEP = 30 -async def worker(worker_id: int, cfg: object, pool: object) -> None: +async def worker(worker_id: int, cfg: dict, pool: object) -> None: """Esegue il ciclo di lavoro per l'invio dei dati. Il worker preleva un record dal database che indica dati pronti per @@ -37,7 +37,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: Args: worker_id (int): L'ID univoco del worker. - cfg (object): L'oggetto di configurazione. + cfg (dict): L'oggetto di configurazione. pool (object): Il pool di connessioni al database. """ diff --git a/src/utils/database/action_query.py b/src/utils/database/action_query.py index 2de359f..1206228 100644 --- a/src/utils/database/action_query.py +++ b/src/utils/database/action_query.py @@ -138,7 +138,7 @@ async def check_flag_elab(pool: object) -> None: async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute("SELECT ferma_elab from admin_panel") + await cur.execute("SELECT stop_elab from admin_panel") results = await cur.fetchone() return results[0]