reorg elab_query

This commit is contained in:
2025-08-09 19:09:40 +02:00
parent 5fc40093e2
commit 3a3b63e360
9 changed files with 166 additions and 119 deletions

View File

@@ -14,6 +14,7 @@ CREATE TABLE `received` (
`matlab_timestamp` timestamp NULL DEFAULT NULL,
`inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`loaded_at` timestamp DEFAULT NULL,
`start_elab_at` timestamp NULL DEFAULT NULL,
`elaborated_at` timestamp NULL DEFAULT NULL,
`sent_raw_at` timestamp NULL DEFAULT NULL,
`sent_elab_at` timestamp NULL DEFAULT NULL,

2
env/ftp.ini vendored
View File

@@ -28,7 +28,7 @@
[tool]
Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS|HEALTH-|READINGS-|INTEGRITY MONITOR|MESSPUNKTEPINI_|HIRPINIA|CO_[0-9]{4}_[0-9]|VULINK
Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|MEASUREMENTS_|CHESA_ARCOIRIS_[0-9]*|TS_PS_PETITES_CROISETTES|CO_[0-9]{4}_[0-9]
Alias = CO_:=|HEALTH-:HEALTH|READINGS-:READINGS|MESSPUNKTEPINI_:MESSPUNKTEPINI|
Alias = CO_:CO|HEALTH-:HEALTH|READINGS-:READINGS|MESSPUNKTEPINI_:MESSPUNKTEPINI|
[csv]
Infos = IP|Subnet|Gateway

View File

@@ -7,7 +7,7 @@ import asyncio
# Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags
from utils.database.matlab_query import get_matlab_command
from utils.database.action_query import get_tool_info
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock
@@ -45,11 +45,11 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati
tool_elab_info = await get_matlab_command(cfg, tool_name.upper(), unit_name.upper(), pool)
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, tool_name.upper(), unit_name.upper(), pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
await update_status(cfg, id, WorkflowFlags.START_ELAB, pool)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
proc = await asyncio.create_subprocess_shell(
matlab_cmd,

View File

@@ -10,7 +10,8 @@ from utils.database import WorkflowFlags
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock
from utils.database.elab_query import get_data_as_csv
from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp
from utils.general import alterna_valori
#from utils.ftp.elab_send import send_csv_to_customer
@@ -20,7 +21,7 @@ logger = logging.getLogger()
# Delay tra un processamento CSV e il successivo (in secondi)
ELAB_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60
NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None:
@@ -30,27 +31,32 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.info("Avviato")
alternatore = alterna_valori([WorkflowFlags.CSV_RECEIVED,WorkflowFlags.SENT_RAW_DATA], [WorkflowFlags.DATA_ELABORATED, WorkflowFlags.SENT_ELAB_DATA])
while True:
try:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA)
status, fase = next(alternatore)
record = await get_next_csv_atomic(pool, cfg.dbrectable, status, fase)
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
'''
if tool_elab_info['ftp_send']:
tool_elab_info = await get_tool_info(fase, unit_name, tool_name, pool)
if fase == WorkflowFlags.SENT_ELAB_DATA and tool_elab_info['ftp_send']:
timestamp_matlab_elab = get_elab_timestamp(id, pool)
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
print(elab_csv)
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
#await update_status(cfg, id, , pool)
#await update_status(cfg, id, , pool)
if True:
await update_status(cfg, id, WorkflowFlags.SENT_ELAB_DATA, pool)
else:
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
'''
elif fase == WorkflowFlags.SENT_RAW_DATA and tool_elab_info['ftp_send_raw']:
...
await unlock(cfg, id, pool)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)

View File

@@ -5,12 +5,13 @@ class WorkflowFlags:
SENT_RAW_DATA = 4 # 0100
SENT_ELAB_DATA = 8 # 1000
DUMMY_ELABORATED = 16 # 10000
START_ELAB = 32 # 100000
# Mappatura flag -> colonna timestamp
FLAG_TO_TIMESTAMP = {
WorkflowFlags.CSV_RECEIVED: "inserted_at",
WorkflowFlags.DATA_LOADED: "loaded_at",
WorkflowFlags.START_ELAB: "start_elab_at",
WorkflowFlags.DATA_ELABORATED: "elaborated_at",
WorkflowFlags.SENT_RAW_DATA: "sent_raw_at",
WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at",

View File

@@ -0,0 +1,131 @@
import logging
import aiomysql
import csv
from io import StringIO
from utils.database import WorkflowFlags
logger = logging.getLogger(__name__)
sub_select = {
WorkflowFlags.DATA_ELABORATED:
"""m.matcall, s.`desc` AS statustools""",
WorkflowFlags.SENT_RAW_DATA:
"""t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""",
WorkflowFlags.SENT_ELAB_DATA:
"""t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw,
IFNULL(u.ftp_addrs_raw, "") AS ftp_addrs_raw, IFNULL(u.ftp_user_raw, "") AS ftp_user_raw,
IFNULL(u.ftp_passwd_raw, "") AS ftp_passwd_raw, IFNULL(u.ftp_filename_raw, "") AS ftp_filename_raw,
IFNULL(u.ftp_parm_raw, "") AS ftp_parm_raw, IFNULL(u.ftp_target_raw, "") AS ftp_target_raw,
t.unit_id, s.`desc` AS statustools, u.inoltro_ftp_raw, u.inoltro_api_raw,
IFNULL(u.inoltro_api_url_raw, "") AS inoltro_api_url_raw,
IFNULL(u.inoltro_api_bearer_token_raw, "") AS inoltro_api_bearer_token_raw,
t.api_send_raw, IFNULL(u.duedate, "") AS duedate
"""
}
async def get_tool_info(next_status: int, unit: str, tool: str, pool: object) -> tuple:
"""
Retrieves tool-specific information from the database based on the next workflow status,
unit name, and tool name.
This function dynamically selects columns based on the `next_status` provided,
joining `matfuncs`, `tools`, `units`, and `statustools` tables.
Args:
next_status (int): The next workflow status flag (e.g., WorkflowFlags.DATA_ELABORATED).
This determines which set of columns to select from the database.
unit (str): The name of the unit associated with the tool.
tool (str): The name of the tool.
pool (object): The database connection pool.
Returns:
tuple: A dictionary-like object (aiomysql.DictCursor result) containing the tool information,
or None if no information is found for the given unit and tool.
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(f"""
SELECT {sub_select[next_status]}
FROM matfuncs AS m
INNER JOIN tools AS t ON t.matfunc = m.id
INNER JOIN units AS u ON u.id = t.unit_id
INNER JOIN statustools AS s ON t.statustool_id = s.id
WHERE t.name = '{tool}' AND u.name = '{unit}';
""")
result = await cur.fetchone()
if not result:
logger.error(f"{unit} - {tool}: Tool info not found.")
return None
else:
return result
async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str:
"""
Retrieves elaborated data from the database and formats it as a CSV string.
The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`,
and a `updated_at` timestamp, then orders it. The first row of the CSV will be
the column headers.
Args:
cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency).
id_recv (int): The ID of the record being processed (used for logging).
pool (object): The database connection pool.
unit (str): The name of the unit to filter the data.
tool (str): The ID of the tool to filter the data.
matlab_timestamp (float): A timestamp used to filter data updated after this time.
Returns:
str: A string containing the elaborated data in CSV format.
"""
query = """
select * from (
select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth',
'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local',
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
union all
select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth,
XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local,
speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr
from ElabDataView
where UnitName = %s and ToolNameID = %s and updated_at > %s
order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC
) resulting_set
"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(query, (unit, tool, matlab_timestamp))
results = await cur.fetchall()
logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV")
logger.info(f"Numero di righe estratte: {len(results)}")
# Creare CSV in memoria
output = StringIO()
writer = csv.writer(output, delimiter=",", lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
for row in results:
writer.writerow(row)
csv_data = output.getvalue()
output.close()
return csv_data
except Exception as e:
logger.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}")
return None
async def get_elab_timestamp(id_recv: int, pool: object) -> float:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(f"""SELECT start_elab_at from received where id = {id_recv}""")
results = await cur.fetchone()
return results[0]
except Exception as e:
logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}")
return None

View File

@@ -1,60 +0,0 @@
import csv
from io import StringIO
import logging
logger = logging.getLogger(__name__)
async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str:
"""
Retrieves elaborated data from the database and formats it as a CSV string.
The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`,
and a `updated_at` timestamp, then orders it. The first row of the CSV will be
the column headers.
Args:
cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency).
id_recv (int): The ID of the record being processed (used for logging).
pool (object): The database connection pool.
unit (str): The name of the unit to filter the data.
tool (str): The ID of the tool to filter the data.
matlab_timestamp (float): A timestamp used to filter data updated after this time.
Returns:
str: A string containing the elaborated data in CSV format.
"""
query = """
select * from (
select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth',
'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local',
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
union all
select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth,
XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local,
speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr
from ElabDataView
where UnitName = %s and ToolNameID = %s and updated_at > %s
order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC
) resulting_set
"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(query, (unit, tool, matlab_timestamp))
results = await cur.fetchall()
logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV")
logger.info(f"Numero di righe estratte: {len(results)}")
# Creare CSV in memoria
output = StringIO()
writer = csv.writer(output, delimiter=",", lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
for row in results:
writer.writerow(row)
csv_data = output.getvalue()
output.close()
return csv_data
except Exception as e:
logger.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}")
return None

View File

@@ -1,39 +0,0 @@
import logging
import aiomysql
logger = logging.getLogger(__name__)
async def get_matlab_command(cfg: object, tool: str, unit: str, pool: object) -> tuple:
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
Interroga il database per ottenere i dettagli necessari all'avvio di uno script
Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit).
Args:
cfg (object): L'oggetto di configurazione.
tool (str): Il nome dello strumento.
unit (str): Il nome dell'unità.
pool (object): Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente le informazioni del comando Matlab,
o None se non viene trovato alcun comando.
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(f"""
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
INNER JOIN tools as t on t.matfunc = m.id
INNER JOIN units as u on u.id = t.unit_id
INNER JOIN statustools as s on t.statustool_id = s.id
where t.name = '{tool}' AND u.name = '{unit}';
""")
result = await cur.fetchone()
if not result:
logger.error(f"{unit} - {tool}: Matlab command not found.")
return None
else:
return result

7
src/utils/general.py Normal file
View File

@@ -0,0 +1,7 @@
def alterna_valori(val1: str, val2: str) -> any:
"""
Restituisce alternativamente il primo ed il secondo valore
"""
while True:
yield val1
yield val2