diff --git a/dbddl/received.ddl b/dbddl/received.ddl index c1b2e50..90d112c 100644 --- a/dbddl/received.ddl +++ b/dbddl/received.ddl @@ -14,6 +14,7 @@ CREATE TABLE `received` ( `matlab_timestamp` timestamp NULL DEFAULT NULL, `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `loaded_at` timestamp DEFAULT NULL, + `start_elab_at` timestamp NULL DEFAULT NULL, `elaborated_at` timestamp NULL DEFAULT NULL, `sent_raw_at` timestamp NULL DEFAULT NULL, `sent_elab_at` timestamp NULL DEFAULT NULL, diff --git a/env/ftp.ini b/env/ftp.ini index fe75176..393e86f 100644 --- a/env/ftp.ini +++ b/env/ftp.ini @@ -28,7 +28,7 @@ [tool] Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS|HEALTH-|READINGS-|INTEGRITY MONITOR|MESSPUNKTEPINI_|HIRPINIA|CO_[0-9]{4}_[0-9]|VULINK Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|MEASUREMENTS_|CHESA_ARCOIRIS_[0-9]*|TS_PS_PETITES_CROISETTES|CO_[0-9]{4}_[0-9] - Alias = CO_:=|HEALTH-:HEALTH|READINGS-:READINGS|MESSPUNKTEPINI_:MESSPUNKTEPINI| + Alias = CO_:CO|HEALTH-:HEALTH|READINGS-:READINGS|MESSPUNKTEPINI_:MESSPUNKTEPINI| [csv] Infos = IP|Subnet|Gateway diff --git a/src/elab_orchestrator.py b/src/elab_orchestrator.py index 90c2108..fa36b1a 100755 --- a/src/elab_orchestrator.py +++ b/src/elab_orchestrator.py @@ -7,7 +7,7 @@ import asyncio # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting from utils.database import WorkflowFlags -from utils.database.matlab_query import get_matlab_command +from utils.database.action_query import get_tool_info from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context from utils.database.loader_action import update_status, unlock @@ -45,11 +45,11 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: if record: id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] if tool_type.lower() != "gd": # i tool GD non devono essere elaborati - tool_elab_info = await get_matlab_command(cfg, tool_name.upper(), unit_name.upper(), pool) + tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, tool_name.upper(), unit_name.upper(), pool) if tool_elab_info: if tool_elab_info['statustools'].lower() in cfg.elab_status: logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ") - + await update_status(cfg, id, WorkflowFlags.START_ELAB, pool) matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" proc = await asyncio.create_subprocess_shell( matlab_cmd, diff --git a/src/send_orchestrator.py b/src/send_orchestrator.py index 3d8ef50..3251d15 100755 --- a/src/send_orchestrator.py +++ b/src/send_orchestrator.py @@ -10,7 +10,8 @@ from utils.database import WorkflowFlags from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context from utils.database.loader_action import update_status, unlock -from utils.database.elab_query import get_data_as_csv +from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp +from utils.general import alterna_valori #from utils.ftp.elab_send import send_csv_to_customer @@ -20,7 +21,7 @@ logger = logging.getLogger() # Delay tra un processamento CSV e il successivo (in secondi) ELAB_PROCESSING_DELAY = 0.2 # Tempo di attesa se non ci sono record da elaborare -NO_RECORD_SLEEP = 60 +NO_RECORD_SLEEP = 30 async def worker(worker_id: int, cfg: object, pool: object) -> None: @@ -30,27 +31,32 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.info("Avviato") + alternatore = alterna_valori([WorkflowFlags.CSV_RECEIVED,WorkflowFlags.SENT_RAW_DATA], [WorkflowFlags.DATA_ELABORATED, WorkflowFlags.SENT_ELAB_DATA]) + while True: try: logger.info("Inizio elaborazione") - record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA) + status, fase = next(alternatore) + record = await get_next_csv_atomic(pool, cfg.dbrectable, status, fase) if record: id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - - ''' - if tool_elab_info['ftp_send']: + tool_elab_info = await get_tool_info(fase, unit_name, tool_name, pool) + if fase == WorkflowFlags.SENT_ELAB_DATA and tool_elab_info['ftp_send']: + timestamp_matlab_elab = get_elab_timestamp(id, pool) if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab: if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): print(elab_csv) #if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): - #await update_status(cfg, id, , pool) - #await update_status(cfg, id, , pool) + if True: + await update_status(cfg, id, WorkflowFlags.SENT_ELAB_DATA, pool) else: logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.") - ''' + elif fase == WorkflowFlags.SENT_RAW_DATA and tool_elab_info['ftp_send_raw']: + ... + await unlock(cfg, id, pool) else: logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) diff --git a/src/utils/database/__init__.py b/src/utils/database/__init__.py index 4ee18c3..73e813c 100644 --- a/src/utils/database/__init__.py +++ b/src/utils/database/__init__.py @@ -1,16 +1,17 @@ class WorkflowFlags: - CSV_RECEIVED = 0 # 0000 - DATA_LOADED = 1 # 0001 - DATA_ELABORATED = 2 # 0010 - SENT_RAW_DATA = 4 # 0100 - SENT_ELAB_DATA = 8 # 1000 - DUMMY_ELABORATED = 16 # 10000 - + CSV_RECEIVED = 0 # 0000 + DATA_LOADED = 1 # 0001 + DATA_ELABORATED = 2 # 0010 + SENT_RAW_DATA = 4 # 0100 + SENT_ELAB_DATA = 8 # 1000 + DUMMY_ELABORATED = 16 # 10000 + START_ELAB = 32 # 100000 # Mappatura flag -> colonna timestamp FLAG_TO_TIMESTAMP = { WorkflowFlags.CSV_RECEIVED: "inserted_at", WorkflowFlags.DATA_LOADED: "loaded_at", + WorkflowFlags.START_ELAB: "start_elab_at", WorkflowFlags.DATA_ELABORATED: "elaborated_at", WorkflowFlags.SENT_RAW_DATA: "sent_raw_at", WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at", diff --git a/src/utils/database/action_query.py b/src/utils/database/action_query.py new file mode 100644 index 0000000..2fce348 --- /dev/null +++ b/src/utils/database/action_query.py @@ -0,0 +1,131 @@ +import logging +import aiomysql +import csv +from io import StringIO +from utils.database import WorkflowFlags + +logger = logging.getLogger(__name__) + +sub_select = { + WorkflowFlags.DATA_ELABORATED: + """m.matcall, s.`desc` AS statustools""", + WorkflowFlags.SENT_RAW_DATA: + """t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""", + WorkflowFlags.SENT_ELAB_DATA: + """t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw, + IFNULL(u.ftp_addrs_raw, "") AS ftp_addrs_raw, IFNULL(u.ftp_user_raw, "") AS ftp_user_raw, + IFNULL(u.ftp_passwd_raw, "") AS ftp_passwd_raw, IFNULL(u.ftp_filename_raw, "") AS ftp_filename_raw, + IFNULL(u.ftp_parm_raw, "") AS ftp_parm_raw, IFNULL(u.ftp_target_raw, "") AS ftp_target_raw, + t.unit_id, s.`desc` AS statustools, u.inoltro_ftp_raw, u.inoltro_api_raw, + IFNULL(u.inoltro_api_url_raw, "") AS inoltro_api_url_raw, + IFNULL(u.inoltro_api_bearer_token_raw, "") AS inoltro_api_bearer_token_raw, + t.api_send_raw, IFNULL(u.duedate, "") AS duedate + """ + } + +async def get_tool_info(next_status: int, unit: str, tool: str, pool: object) -> tuple: + """ + Retrieves tool-specific information from the database based on the next workflow status, + unit name, and tool name. + + This function dynamically selects columns based on the `next_status` provided, + joining `matfuncs`, `tools`, `units`, and `statustools` tables. + + Args: + next_status (int): The next workflow status flag (e.g., WorkflowFlags.DATA_ELABORATED). + This determines which set of columns to select from the database. + unit (str): The name of the unit associated with the tool. + tool (str): The name of the tool. + pool (object): The database connection pool. + + Returns: + tuple: A dictionary-like object (aiomysql.DictCursor result) containing the tool information, + or None if no information is found for the given unit and tool. + """ + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute(f""" + SELECT {sub_select[next_status]} + FROM matfuncs AS m + INNER JOIN tools AS t ON t.matfunc = m.id + INNER JOIN units AS u ON u.id = t.unit_id + INNER JOIN statustools AS s ON t.statustool_id = s.id + WHERE t.name = '{tool}' AND u.name = '{unit}'; + """) + + result = await cur.fetchone() + + if not result: + logger.error(f"{unit} - {tool}: Tool info not found.") + return None + else: + return result + + +async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str: + """ + Retrieves elaborated data from the database and formats it as a CSV string. + + The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`, + and a `updated_at` timestamp, then orders it. The first row of the CSV will be + the column headers. + + Args: + cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency). + id_recv (int): The ID of the record being processed (used for logging). + pool (object): The database connection pool. + unit (str): The name of the unit to filter the data. + tool (str): The ID of the tool to filter the data. + matlab_timestamp (float): A timestamp used to filter data updated after this time. + + Returns: + str: A string containing the elaborated data in CSV format. + """ + query = """ + select * from ( + select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth', + 'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local', + 'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr' + union all + select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth, + XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local, + speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr + from ElabDataView + where UnitName = %s and ToolNameID = %s and updated_at > %s + order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC + ) resulting_set + """ + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(query, (unit, tool, matlab_timestamp)) + results = await cur.fetchall() + logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV") + logger.info(f"Numero di righe estratte: {len(results)}") + + # Creare CSV in memoria + output = StringIO() + writer = csv.writer(output, delimiter=",", lineterminator="\n", quoting=csv.QUOTE_MINIMAL) + for row in results: + writer.writerow(row) + csv_data = output.getvalue() + output.close() + + return csv_data + + except Exception as e: + logger.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}") + return None + + +async def get_elab_timestamp(id_recv: int, pool: object) -> float: + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(f"""SELECT start_elab_at from received where id = {id_recv}""") + results = await cur.fetchone() + return results[0] + + except Exception as e: + logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}") + return None \ No newline at end of file diff --git a/src/utils/database/elab_query.py b/src/utils/database/elab_query.py deleted file mode 100644 index b7cab53..0000000 --- a/src/utils/database/elab_query.py +++ /dev/null @@ -1,60 +0,0 @@ -import csv -from io import StringIO -import logging - -logger = logging.getLogger(__name__) - -async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str: - """ - Retrieves elaborated data from the database and formats it as a CSV string. - - The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`, - and a `updated_at` timestamp, then orders it. The first row of the CSV will be - the column headers. - - Args: - cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency). - id_recv (int): The ID of the record being processed (used for logging). - pool (object): The database connection pool. - unit (str): The name of the unit to filter the data. - tool (str): The ID of the tool to filter the data. - matlab_timestamp (float): A timestamp used to filter data updated after this time. - - Returns: - str: A string containing the elaborated data in CSV format. - """ - query = """ - select * from ( - select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth', - 'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local', - 'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr' - union all - select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth, - XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local, - speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr - from ElabDataView - where UnitName = %s and ToolNameID = %s and updated_at > %s - order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC - ) resulting_set - """ - async with pool.acquire() as conn: - async with conn.cursor() as cur: - try: - await cur.execute(query, (unit, tool, matlab_timestamp)) - results = await cur.fetchall() - logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV") - logger.info(f"Numero di righe estratte: {len(results)}") - - # Creare CSV in memoria - output = StringIO() - writer = csv.writer(output, delimiter=",", lineterminator="\n", quoting=csv.QUOTE_MINIMAL) - for row in results: - writer.writerow(row) - csv_data = output.getvalue() - output.close() - - return csv_data - - except Exception as e: - logger.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}") - return None \ No newline at end of file diff --git a/src/utils/database/matlab_query.py b/src/utils/database/matlab_query.py deleted file mode 100644 index f11b5c8..0000000 --- a/src/utils/database/matlab_query.py +++ /dev/null @@ -1,39 +0,0 @@ -import logging -import aiomysql - -logger = logging.getLogger(__name__) - -async def get_matlab_command(cfg: object, tool: str, unit: str, pool: object) -> tuple: - """Recupera le informazioni per l'esecuzione di un comando Matlab dal database. - - Interroga il database per ottenere i dettagli necessari all'avvio di uno script - Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit). - - Args: - cfg (object): L'oggetto di configurazione. - tool (str): Il nome dello strumento. - unit (str): Il nome dell'unità. - pool (object): Il pool di connessioni al database. - - Returns: - tuple: Una tupla contenente le informazioni del comando Matlab, - o None se non viene trovato alcun comando. - """ - - async with pool.acquire() as conn: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute(f""" - SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m - INNER JOIN tools as t on t.matfunc = m.id - INNER JOIN units as u on u.id = t.unit_id - INNER JOIN statustools as s on t.statustool_id = s.id - where t.name = '{tool}' AND u.name = '{unit}'; - """) - - result = await cur.fetchone() - - if not result: - logger.error(f"{unit} - {tool}: Matlab command not found.") - return None - else: - return result \ No newline at end of file diff --git a/src/utils/general.py b/src/utils/general.py new file mode 100644 index 0000000..4de451b --- /dev/null +++ b/src/utils/general.py @@ -0,0 +1,7 @@ +def alterna_valori(val1: str, val2: str) -> any: + """ + Restituisce alternativamente il primo ed il secondo valore + """ + while True: + yield val1 + yield val2 \ No newline at end of file