fix flag elab

This commit is contained in:
2025-09-15 22:06:01 +02:00
parent 2d2668c92c
commit 8cd5a21275
5 changed files with 70 additions and 71 deletions

View File

@@ -10,7 +10,7 @@ import asyncio
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags from utils.database import WorkflowFlags
from utils.database.action_query import get_tool_info from utils.database.action_query import get_tool_info, check_flag_elab
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock from utils.database.loader_action import update_status, unlock
@@ -47,61 +47,65 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) if not await check_flag_elab(pool):
if record: record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] if record:
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool) if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
if tool_elab_info: tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info['statustools'].lower() in cfg.elab_status: if tool_elab_info:
logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name) if tool_elab_info['statustools'].lower() in cfg.elab_status:
await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool) logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool)
proc = await asyncio.create_subprocess_shell( matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
matlab_cmd, proc = await asyncio.create_subprocess_shell(
cwd=cfg.matlab_func_path, matlab_cmd,
stdout=asyncio.subprocess.PIPE, cwd=cfg.matlab_func_path,
stderr=asyncio.subprocess.PIPE stdout=asyncio.subprocess.PIPE,
) stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate() stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else:
error_type = f"Matlab elab failed: {proc.returncode}."
# da verificare i log dove prenderli
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else: else:
error_type = f"Matlab elab failed: {proc.returncode}." logger.info(stdout.decode().strip())
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
# da verificare i log dove prenderli await unlock(cfg, rec_id, pool)
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f: await asyncio.sleep(ELAB_PROCESSING_DELAY)
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
else: else:
logger.info(stdout.decode().strip()) logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools'])
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, rec_id, pool) await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY) await unlock(cfg, rec_id, pool)
else: else:
logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools']) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) await unlock(cfg, rec_id, pool)
await unlock(cfg, rec_id, pool)
else:
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
else: else:
logger.info("Nessun record disponibile") logger.info("Flag fermo elaborazione attivato")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except

View File

@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer): class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" """Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: object) -> None: def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database. """Initializes the authorizer, adds the admin user, and loads users from the database.
Args: Args:

View File

@@ -12,7 +12,6 @@ import asyncio
from utils.config import loader_load_data as setting from utils.config import loader_load_data as setting
from utils.database import WorkflowFlags from utils.database import WorkflowFlags
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
from utils.database.action_query import check_flag_elab
from utils.orchestrator_utils import run_orchestrator, worker_context from utils.orchestrator_utils import run_orchestrator, worker_context
# Initialize the logger for this module # Initialize the logger for this module
@@ -24,7 +23,7 @@ CSV_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV. """Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
Il worker preleva un record CSV dal database, ne elabora il contenuto Il worker preleva un record CSV dal database, ne elabora il contenuto
@@ -32,7 +31,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """
# Imposta il context per questo worker # Imposta il context per questo worker
@@ -43,24 +42,20 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
if not await check_flag_elab(): record = await get_next_csv_atomic(
record = await get_next_csv_atomic( pool,
pool, cfg.dbrectable,
cfg.dbrectable, WorkflowFlags.CSV_RECEIVED,
WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED,
WorkflowFlags.DATA_LOADED, )
)
if record: if record:
success = await load_csv(record, cfg, pool) success = await load_csv(record, cfg, pool)
if not success: if not success:
logger.error("Errore durante l'elaborazione") logger.error("Errore durante l'elaborazione")
await asyncio.sleep(CSV_PROCESSING_DELAY) await asyncio.sleep(CSV_PROCESSING_DELAY)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
else: else:
logger.info("Flag fermo elaborazione attivato") logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except

View File

@@ -28,7 +28,7 @@ ELAB_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 30 NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'invio dei dati. """Esegue il ciclo di lavoro per l'invio dei dati.
Il worker preleva un record dal database che indica dati pronti per Il worker preleva un record dal database che indica dati pronti per
@@ -37,7 +37,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """

View File

@@ -138,7 +138,7 @@ async def check_flag_elab(pool: object) -> None:
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute("SELECT ferma_elab from admin_panel") await cur.execute("SELECT stop_elab from admin_panel")
results = await cur.fetchone() results = await cur.fetchone()
return results[0] return results[0]