diff --git a/elab_orchestrator.py b/elab_orchestrator.py index 56a8b3f..45477ac 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -48,22 +48,23 @@ async def worker(worker_id: int, cfg: object, pool) -> None: if record: id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - matlab_info = await get_matlab_command(cfg, tool_name, unit_name) - matlab_cmd = f"timeout {cfg.timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}" + matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool) + if matlab_info: + matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}" - # matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt' + # matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt' - success = await subprocess.run(matlab_cmd, - cwd=cfg.matlab_func_path, - capture_output=True, - text=True, - check=True) + success = await subprocess.run(matlab_cmd, + cwd=cfg.matlab_func_path, + capture_output=True, + text=True, + check=True) - if not success: - logger.error("Errore durante l'elaborazione") - await asyncio.sleep(ELAB_PROCESSING_DELAY) + if not success: + logger.error("Errore durante l'elaborazione") + await asyncio.sleep(ELAB_PROCESSING_DELAY) else: - logger.debug("Nessun record disponibile") + logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: diff --git a/env/elab.ini b/env/elab.ini index fedad1a..5a6ccb6 100644 --- a/env/elab.ini +++ b/env/elab.ini @@ -1,14 +1,14 @@ [logging] - logFilename = ./load_raw_data.log + logFilename = ./elab_data.log [threads] - max_num = 20 + max_num = 10 [matlab] - runtime = "/usr/local/MATLAB/MATLAB_Runtime/v93" - func_path = "/usr/local/matlab_func/" + runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 + func_path = /usr/local/matlab_func/ timeout = 1800 error = "" - error_path = "/tmp/" + error_path = /tmp/ diff --git a/load_orchestrator.py b/load_orchestrator.py index c5afc4d..52e7997 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -50,7 +50,7 @@ async def worker(worker_id: int, cfg: object, pool) -> None: logger.error("Errore durante l'elaborazione") await asyncio.sleep(CSV_PROCESSING_DELAY) else: - logger.debug("Nessun record disponibile") + logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) except Exception as e: diff --git a/utils/database/connection.py b/utils/database/connection.py index fbd29b1..a48e395 100644 --- a/utils/database/connection.py +++ b/utils/database/connection.py @@ -3,7 +3,7 @@ import mysql.connector logger = logging.getLogger(__name__) -def connetti_db(cfg: object) -> object: +def nnn_connetti_db(cfg: object) -> object: """ Establishes a connection to a MySQL database. diff --git a/utils/database/matlab_query.py b/utils/database/matlab_query.py index 0f90b6e..dc0a23d 100644 --- a/utils/database/matlab_query.py +++ b/utils/database/matlab_query.py @@ -1,27 +1,24 @@ -from utils.database.connection import connetti_db import logging +import aiomysql logger = logging.getLogger(__name__) -def get_matlab_command(cfg: object, tool: str, unit: str) -> tuple: +async def get_matlab_command(cfg: object, tool: str, unit: str, pool) -> tuple: - with connetti_db(cfg) as conn: - cur = conn.cursor(dictionary=True) - query = f""" - SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m - INNER JOIN tools as t on t.matfunc = m.id - INNER JOIN units as u on u.id = t.unit_id - INNER JOIN statustools as s on t.statustool_id = s.id - where t.name = '{tool}' AND u.name = '{unit}'; + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(f""" + SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m + INNER JOIN tools as t on t.matfunc = m.id + INNER JOIN units as u on u.id = t.unit_id + INNER JOIN statustools as s on t.statustool_id = s.id + where t.name = '{tool}' AND u.name = '{unit}'; + """) - """ - cur.execute(query) - result = cur.fetchone() - cur.close() - conn.close() + result = await cur.fetchone() - if not result: - logger.error(f"{unit} - {tool}: Matlab command not found.") - return None - else: - return result \ No newline at end of file + if not result: + logger.error(f"{unit} - {tool}: Matlab command not found.") + return None + else: + return result \ No newline at end of file diff --git a/utils/database/nodes_query.py b/utils/database/nodes_query.py index ceaa3cd..2732208 100644 --- a/utils/database/nodes_query.py +++ b/utils/database/nodes_query.py @@ -1,37 +1,36 @@ -from utils.database.connection import connetti_db + +import aiomysql import logging logger = logging.getLogger(__name__) -def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple: +async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple: - with connetti_db(cfg) as conn: - cur = conn.cursor(dictionary=True) - query = f""" - SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din - FROM {cfg.dbname}.{cfg.dbnodes} AS n - INNER JOIN tools AS t ON t.id = n.tool_id - INNER JOIN units AS u ON u.id = t.unit_id - INNER JOIN nodetypes AS y ON n.nodetype_id = y.id - WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' - ORDER BY n.num; - """ - #logger.info(f"{unit} - {tool}: Executing query: {query}") - cur.execute(query) - results = cur.fetchall() - logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") - cur.close() - conn.close() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(f""" + SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din + FROM {cfg.dbname}.{cfg.dbnodes} AS n + INNER JOIN tools AS t ON t.id = n.tool_id + INNER JOIN units AS u ON u.id = t.unit_id + INNER JOIN nodetypes AS y ON n.nodetype_id = y.id + WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' + ORDER BY n.num; + """) + + results = await cur.fetchall() + logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") + + if not results: + logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.") + return None, None, None, None + else: + channels, types, ains, dins = [], [], [], [] + for row in results: + channels.append(row['channels']) + types.append(row['type']) + ains.append(row['ain']) + dins.append(row['din']) + return channels, types, ains, dins - if not results: - logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.") - return None, None, None, None - else: - channels, types, ains, dins = [], [], [], [] - for row in results: - channels.append(row['channels']) - types.append(row['type']) - ains.append(row['ain']) - dins.append(row['din']) - return channels, types, ains, dins