dict cursor e pool conn
This commit is contained in:
@@ -48,22 +48,23 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
|
|||||||
|
|
||||||
if record:
|
if record:
|
||||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||||
matlab_info = await get_matlab_command(cfg, tool_name, unit_name)
|
matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool)
|
||||||
matlab_cmd = f"timeout {cfg.timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
|
if matlab_info:
|
||||||
|
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
|
||||||
|
|
||||||
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
|
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
|
||||||
|
|
||||||
success = await subprocess.run(matlab_cmd,
|
success = await subprocess.run(matlab_cmd,
|
||||||
cwd=cfg.matlab_func_path,
|
cwd=cfg.matlab_func_path,
|
||||||
capture_output=True,
|
capture_output=True,
|
||||||
text=True,
|
text=True,
|
||||||
check=True)
|
check=True)
|
||||||
|
|
||||||
if not success:
|
if not success:
|
||||||
logger.error("Errore durante l'elaborazione")
|
logger.error("Errore durante l'elaborazione")
|
||||||
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
||||||
else:
|
else:
|
||||||
logger.debug("Nessun record disponibile")
|
logger.info("Nessun record disponibile")
|
||||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
10
env/elab.ini
vendored
10
env/elab.ini
vendored
@@ -1,14 +1,14 @@
|
|||||||
[logging]
|
[logging]
|
||||||
logFilename = ./load_raw_data.log
|
logFilename = ./elab_data.log
|
||||||
|
|
||||||
[threads]
|
[threads]
|
||||||
max_num = 20
|
max_num = 10
|
||||||
|
|
||||||
[matlab]
|
[matlab]
|
||||||
runtime = "/usr/local/MATLAB/MATLAB_Runtime/v93"
|
runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
|
||||||
func_path = "/usr/local/matlab_func/"
|
func_path = /usr/local/matlab_func/
|
||||||
timeout = 1800
|
timeout = 1800
|
||||||
error = ""
|
error = ""
|
||||||
error_path = "/tmp/"
|
error_path = /tmp/
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
|
|||||||
logger.error("Errore durante l'elaborazione")
|
logger.error("Errore durante l'elaborazione")
|
||||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||||
else:
|
else:
|
||||||
logger.debug("Nessun record disponibile")
|
logger.info("Nessun record disponibile")
|
||||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import mysql.connector
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def connetti_db(cfg: object) -> object:
|
def nnn_connetti_db(cfg: object) -> object:
|
||||||
"""
|
"""
|
||||||
Establishes a connection to a MySQL database.
|
Establishes a connection to a MySQL database.
|
||||||
|
|
||||||
|
|||||||
@@ -1,27 +1,24 @@
|
|||||||
from utils.database.connection import connetti_db
|
|
||||||
import logging
|
import logging
|
||||||
|
import aiomysql
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def get_matlab_command(cfg: object, tool: str, unit: str) -> tuple:
|
async def get_matlab_command(cfg: object, tool: str, unit: str, pool) -> tuple:
|
||||||
|
|
||||||
with connetti_db(cfg) as conn:
|
async with pool.acquire() as conn:
|
||||||
cur = conn.cursor(dictionary=True)
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
query = f"""
|
await cur.execute(f"""
|
||||||
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
|
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
|
||||||
INNER JOIN tools as t on t.matfunc = m.id
|
INNER JOIN tools as t on t.matfunc = m.id
|
||||||
INNER JOIN units as u on u.id = t.unit_id
|
INNER JOIN units as u on u.id = t.unit_id
|
||||||
INNER JOIN statustools as s on t.statustool_id = s.id
|
INNER JOIN statustools as s on t.statustool_id = s.id
|
||||||
where t.name = '{tool}' AND u.name = '{unit}';
|
where t.name = '{tool}' AND u.name = '{unit}';
|
||||||
|
""")
|
||||||
|
|
||||||
"""
|
result = await cur.fetchone()
|
||||||
cur.execute(query)
|
|
||||||
result = cur.fetchone()
|
|
||||||
cur.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
logger.error(f"{unit} - {tool}: Matlab command not found.")
|
logger.error(f"{unit} - {tool}: Matlab command not found.")
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
return result
|
return result
|
||||||
@@ -1,37 +1,36 @@
|
|||||||
from utils.database.connection import connetti_db
|
|
||||||
|
import aiomysql
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple:
|
async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple:
|
||||||
|
|
||||||
with connetti_db(cfg) as conn:
|
async with pool.acquire() as conn:
|
||||||
cur = conn.cursor(dictionary=True)
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
query = f"""
|
await cur.execute(f"""
|
||||||
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
|
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
|
||||||
FROM {cfg.dbname}.{cfg.dbnodes} AS n
|
FROM {cfg.dbname}.{cfg.dbnodes} AS n
|
||||||
INNER JOIN tools AS t ON t.id = n.tool_id
|
INNER JOIN tools AS t ON t.id = n.tool_id
|
||||||
INNER JOIN units AS u ON u.id = t.unit_id
|
INNER JOIN units AS u ON u.id = t.unit_id
|
||||||
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
|
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
|
||||||
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
|
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
|
||||||
ORDER BY n.num;
|
ORDER BY n.num;
|
||||||
"""
|
""")
|
||||||
#logger.info(f"{unit} - {tool}: Executing query: {query}")
|
|
||||||
cur.execute(query)
|
results = await cur.fetchall()
|
||||||
results = cur.fetchall()
|
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
|
||||||
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
|
|
||||||
cur.close()
|
if not results:
|
||||||
conn.close()
|
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
|
||||||
|
return None, None, None, None
|
||||||
|
else:
|
||||||
|
channels, types, ains, dins = [], [], [], []
|
||||||
|
for row in results:
|
||||||
|
channels.append(row['channels'])
|
||||||
|
types.append(row['type'])
|
||||||
|
ains.append(row['ain'])
|
||||||
|
dins.append(row['din'])
|
||||||
|
return channels, types, ains, dins
|
||||||
|
|
||||||
if not results:
|
|
||||||
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
|
|
||||||
return None, None, None, None
|
|
||||||
else:
|
|
||||||
channels, types, ains, dins = [], [], [], []
|
|
||||||
for row in results:
|
|
||||||
channels.append(row['channels'])
|
|
||||||
types.append(row['type'])
|
|
||||||
ains.append(row['ain'])
|
|
||||||
dins.append(row['din'])
|
|
||||||
return channels, types, ains, dins
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user