diff --git a/dbddl/received.ddl b/dbddl/received.ddl index c79498e..51c47da 100644 --- a/dbddl/received.ddl +++ b/dbddl/received.ddl @@ -10,8 +10,10 @@ CREATE TABLE `received` ( `tool_data` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `locked` int DEFAULT '0', `status` int DEFAULT '0', + `matlab_timestamp` timestamp NULL DEFAULT NULL, `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `loaded_at` datetime DEFAULT NULL, `elaborated_at` timestamp NULL DEFAULT NULL, + `sent_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; \ No newline at end of file diff --git a/elab_orchestrator.py b/elab_orchestrator.py index f59b6a1..c036cad 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -3,13 +3,18 @@ # Import necessary libraries import logging import asyncio +from datetime import datetime # Import custom modules for configuration and database connection from utils.config import loader_matlab_elab as setting -from utils.database import DATA_LOADED +from utils.database import DATA_LOADED, DATA_ELABORATED, DATA_SENT from utils.database.matlab_query import get_matlab_command from utils.csv.loaders import get_next_csv_atomic from utils.orchestrator_utils import run_orchestrator, worker_context +from utils.database.loader_action import update_status, unlock +from utils.database.elab_query import get_data_as_csv +#from utils.ftp.elab_send import send_csv_to_customer + # Initialize the logger for this module logger = logging.getLogger() @@ -45,26 +50,44 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None: if record: id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool) - if matlab_info: - matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}" + tool_elab_info = await get_matlab_command(cfg, tool_name, unit_name, pool) + if tool_elab_info: + if tool_elab_info['statustools'].lower() in cfg.elab_status: + logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ") - # matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt' + matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}" + timestamp_matlab_elab = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + proc = await asyncio.create_subprocess_shell( + matlab_cmd, + cwd=cfg.matlab_func_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - proc = await asyncio.create_subprocess_shell( - matlab_cmd, - cwd=cfg.matlab_func_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) + stdout, stderr = await proc.communicate() - stdout, stderr = await proc.communicate() + if proc.returncode != 0: + logger.error("Errore durante l'elaborazione") + logger.error(stderr.decode().strip()) + with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f: + f.write(stderr.decode().strip()) + else: + logger.info(stdout.decode().strip()) + await update_status(cfg, id, DATA_ELABORATED, pool) + if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab: + if tool_elab_info['ftp_send']: + if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool): + print(elab_csv) + #if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool): + #await update_status(cfg, id, DATA_SENT, pool) + await update_status(cfg, id, DATA_SENT, pool) + else: + logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.") - if proc.returncode != 0: - logger.error("Errore durante l'elaborazione") - logger.error(stderr.decode().strip()) - logger.info(stdout.decode().strip()) - await asyncio.sleep(ELAB_PROCESSING_DELAY) + await unlock(cfg, id, pool) + await asyncio.sleep(ELAB_PROCESSING_DELAY) + else: + logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['statustools']}: MatLab calc by-passed.") else: logger.info("Nessun record disponibile") await asyncio.sleep(NO_RECORD_SLEEP) diff --git a/env/elab.ini b/env/elab.ini index fb04fd1..f2e2690 100644 --- a/env/elab.ini +++ b/env/elab.ini @@ -4,6 +4,10 @@ [threads] max_num = 10 +[tool] + # stati in minuscolo + elab_status = active|manual update + [matlab] #runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 #func_path = /usr/local/matlab_func/ diff --git a/utils/config/loader_matlab_elab.py b/utils/config/loader_matlab_elab.py index 1412a16..c6fe9d2 100644 --- a/utils/config/loader_matlab_elab.py +++ b/utils/config/loader_matlab_elab.py @@ -30,6 +30,9 @@ class Config: self.dbrawdata = c.get("tables", "rawTableName") self.dbnodes = c.get("tables", "nodesTableName") + # Tool + self.elab_status = [part for part in c.get("tool", "elab_status").split('|')] + # Matlab self.matlab_runtime = c.get("matlab", "runtime") self.matlab_func_path = c.get("matlab", "func_path") diff --git a/utils/database/__init__.py b/utils/database/__init__.py index 45c2ad3..e7a09f7 100644 --- a/utils/database/__init__.py +++ b/utils/database/__init__.py @@ -1,3 +1,4 @@ CSV_RECEIVED = 0 DATA_LOADED = 1 -DATA_ELABORATED = 2 \ No newline at end of file +DATA_ELABORATED = 2 +DATA_SENT = 3 \ No newline at end of file diff --git a/utils/database/elab_query.py b/utils/database/elab_query.py new file mode 100644 index 0000000..c12c40d --- /dev/null +++ b/utils/database/elab_query.py @@ -0,0 +1,60 @@ +import csv +from io import StringIO +import logging + +logger = logging.getLogger(__name__) + +async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str: + """ + Retrieves elaborated data from the database and formats it as a CSV string. + + The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`, + and a `updated_at` timestamp, then orders it. The first row of the CSV will be + the column headers. + + Args: + cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency). + id (int): The ID of the record being processed (used for logging). + pool (object): The database connection pool. + unit (str): The name of the unit to filter the data. + tool (str): The ID of the tool to filter the data. + matlab_timestamp (float): A timestamp used to filter data updated after this time. + + Returns: + str: A string containing the elaborated data in CSV format. + """ + query = """ + select * from ( + select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth', + 'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local', + 'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr' + union all + select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth, + XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local, + speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr + from ElabDataView + where UnitName = %s and ToolNameID = %s and updated_at > %s + order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC + ) resulting_set + """ + async with pool.acquire() as conn: + async with conn.cursor() as cur: + try: + await cur.execute(query, (unit, tool, matlab_timestamp)) + results = await cur.fetchall() + logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV") + logger.info(f"Numero di righe estratte: {len(results)}") + + # Creare CSV in memoria + output = StringIO() + writer = csv.writer(output, delimiter=";", lineterminator="\n", quoting=csv.QUOTE_MINIMAL) + for row in results: + writer.writerow(row) + csv_data = output.getvalue() + output.close() + + return csv_data + + except Exception as e: + logging.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}") + return None \ No newline at end of file diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index e18718c..061844d 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -4,7 +4,7 @@ import asyncio logger = logging.getLogger(__name__) -timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"] +timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at", "sent_at"] async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: @@ -109,7 +109,7 @@ async def update_status(cfg: object, id: int, status: int, pool: object) -> None f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}" ) await conn.commit() - logging.info("Status updated.") + logging.info(f"Status updated id {id}.") except Exception as e: await conn.rollback() logging.error(f"Error: {e}") diff --git a/utils/ftp/elab_send.py b/utils/ftp/elab_send.py new file mode 100644 index 0000000..33225ed --- /dev/null +++ b/utils/ftp/elab_send.py @@ -0,0 +1,69 @@ +import ftplib +from io import BytesIO +import logging +import aiomysql + +logger = logging.getLogger(__name__) + +async def send_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool: + """ + Sends elaborated CSV data to a customer via FTP. + + Retrieves FTP connection details from the database based on the unit name, + then establishes an FTP connection and uploads the CSV data. + + Args: + cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency). + id (int): The ID of the record being processed (used for logging). + unit (str): The name of the unit associated with the data. + tool (str): The name of the tool associated with the data. + csv_data (str): The CSV data as a string to be sent. + pool (object): The database connection pool. + + Returns: + bool: True if the CSV data was sent successfully, False otherwise. + """ + query = """ + select ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate from units + where name = '%s'";' + """ + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + try: + await cur.execute(query, (unit,)) + send_ftp_info = await cur.fetchone() + logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp") + except Exception as e: + logging.error(f"id {id} - {unit} - {tool} - errore nel query per invio ftp: {e}") + + try: + # Converti in bytes + csv_bytes = csv_data.encode('utf-8') + csv_buffer = BytesIO(csv_bytes) + + # Connessione FTP + with ftplib.FTP(send_ftp_info["ftp_addrs"]) as ftp: + ftp.login(send_ftp_info["ftp_user"], send_ftp_info["ftp_passwd"]) + + # Cambia directory + if send_ftp_info["ftp_target"] != "/": + ftp.cwd(send_ftp_info["ftp_target"]) + + # Invia il file + result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer) + + if result.startswith('226'): + logging.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo") + return True + else: + logging.error(f"Errore nell'invio: {result}") + return False + + except ftplib.all_errors as e: + logging.error(f"Errore FTP: {e}") + return False + except Exception as e: + logging.error(f"Errore generico: {e}") + return False + finally: + csv_buffer.close()