diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3c2c9a8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // Usare IntelliSense per informazioni sui possibili attributi. + // Al passaggio del mouse vengono visualizzate le descrizioni degli attributi esistenti. + // Per altre informazioni, visitare: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Python File", + "type": "debugpy", + "request": "launch", + "program": "${file}" + } + ] +} \ No newline at end of file diff --git a/dbddl/received.ddl b/dbddl/received.ddl index 69ae6de..59b9215 100644 --- a/dbddl/received.ddl +++ b/dbddl/received.ddl @@ -12,5 +12,6 @@ CREATE TABLE `received` ( `status` int DEFAULT '0', `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `loaded_at` datetime DEFAULT NULL, + `elaborated_at` timestamp NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; \ No newline at end of file diff --git a/elab_orchestrator.py b/elab_orchestrator.py new file mode 100755 index 0000000..7b7c9fe --- /dev/null +++ b/elab_orchestrator.py @@ -0,0 +1,76 @@ +#!.venv/bin/python + +# Import necessary libraries +import mysql.connector +import logging +import importlib +import time +import asyncio +import subprocess + +# Import custom modules for configuration and database connection +from utils.config import loader as setting +from utils.database.connection import connetti_db +from utils.database.loader_action import DATA_LOADED, get_matlab_cmd + +# Initialize the logger for this module +logger = logging.getLogger(__name__) + +# Function to elaborate CSV data +async def run_matlab_elab(id: int, unit_name: str, unit_type: str, tool_name: str, tool_type: str, semaphore: asyncio.Semaphore) -> bool: + async with semaphore: + if get_matlab_cmd(cfg, unit_name, tool_name): + # If a record is found, lock it by updating the 'locked' field to 1 + + + + + + +async def main(): + # Load the configuration settings + cfg = setting.Config() + + try: + # Configure logging to write log messages to a file with a specific format + logging.basicConfig( + format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", + filename=cfg.logfilename, + level=logging.INFO, + ) + + + # Limita il numero di esecuzioni concorrenti a max_threads + semaphore = asyncio.Semaphore(cfg.max_threads) + running_tasks = set() + + # Enter an infinite loop to continuously process records + while True: + try: + # Establish a database connection + with connetti_db(cfg) as conn: + cur = conn.cursor() + # Select a single record from the raw data table that is not currently locked and has a status of 0 + cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {DATA_LOADED} limit 1') + id, unit_name, unit_type, tool_name, tool_type = cur.fetchone() + if id: + task = asyncio.create_task(run_matlab_elab(id, unit_name, unit_type, tool_name, tool_type, semaphore)) + running_tasks.add(task) + # Rimuovi i task completati dal set + running_tasks = {t for t in running_tasks if not t.done()} + + + # If a record was successfully processed, log the number of threads currently running + #logger.info(f"Threads in execution: {len(threads)}") + except Exception as e: + logger.info(f"Error: {e}.") + + except KeyboardInterrupt: + # Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program + logger.info("Info: Shutdown requested...exiting") + + except Exception as e: + logger.info(f"Error: {e}.") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/ftp_csv_receiver.ini b/ftp_csv_receiver.ini index eb93652..3b5c820 100644 --- a/ftp_csv_receiver.ini +++ b/ftp_csv_receiver.ini @@ -27,18 +27,17 @@ user = root password = batt1l0 dbName = ase_lar - dbSchema = public userTableName = virtusers recTableName = received rawTableName = RAWDATACOR nodesTableName = nodes [unit] - Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 + Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1|HORTUS Names = ID[0-9]{4}|IX[0-9]{4} [tool] - Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS + Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement [csv] diff --git a/orchestrator.py b/load_orchestrator.py similarity index 95% rename from orchestrator.py rename to load_orchestrator.py index 02384c0..3057e9c 100755 --- a/orchestrator.py +++ b/load_orchestrator.py @@ -10,18 +10,19 @@ import threading # Import custom modules for configuration and database connection from utils.config import loader as setting from utils.database.connection import connetti_db +from utils.database.loader_action import CSV_RECEIVED # Initialize the logger for this module logger = logging.getLogger(__name__) # Function to elaborate CSV data -def elab_csv(cfg: object, threads: list) -> bool: +def load_csv(cfg: object, threads: list) -> bool: try: # Establish a database connection with connetti_db(cfg) as conn: cur = conn.cursor() # Select a single record from the raw data table that is not currently locked and has a status of 0 - cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') + cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1') id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() if id: # If a record is found, lock it by updating the 'locked' field to 1 @@ -79,7 +80,7 @@ def main(): # Remove it from the list of active threads threads.remove(thread) # Attempt to process a CSV record - if elab_csv(cfg, threads): + if load_csv(cfg, threads): # If a record was successfully processed, log the number of threads currently running logger.info(f"Threads in execution: {len(threads)}") pass diff --git a/utils/config/parser.py b/utils/config/parser.py index a66db83..75be05e 100644 --- a/utils/config/parser.py +++ b/utils/config/parser.py @@ -1,6 +1,6 @@ import re -def extract_value(patterns, primary_source, secondary_source, default='Not Defined'): +def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str: """Extracts the first match for a list of patterns from the primary source. Falls back to the secondary source if no match is found. """ diff --git a/utils/database/connection.py b/utils/database/connection.py index 235e0e7..fbd29b1 100644 --- a/utils/database/connection.py +++ b/utils/database/connection.py @@ -3,7 +3,7 @@ import mysql.connector logger = logging.getLogger(__name__) -def connetti_db(cfg): +def connetti_db(cfg: object) -> object: """ Establishes a connection to a MySQL database. diff --git a/utils/database/loader.py b/utils/database/loader.py deleted file mode 100644 index 20abe76..0000000 --- a/utils/database/loader.py +++ /dev/null @@ -1,32 +0,0 @@ -#!.venv/bin/python -from utils.database.connection import connetti_db -import logging - -logger = logging.getLogger(__name__) - -def load_data(cfg, matrice_valori): - sql_insert_RAWDATA = f''' - INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( - `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, - `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, - `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, - `BatLevelModule`,`TemperatureModule`, `RssiModule` - ) - VALUES ( - %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s - ) - ''' - with connetti_db(cfg) as conn: - cur = conn.cursor() - try: - cur.executemany(sql_insert_RAWDATA, matrice_valori) - conn.commit() - except Exception as e: - conn.rollback() - print(f'Error: {e}') - finally: - cur.close() - conn.close() \ No newline at end of file diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py new file mode 100644 index 0000000..bb78700 --- /dev/null +++ b/utils/database/loader_action.py @@ -0,0 +1,63 @@ +#!.venv/bin/python +from utils.database.connection import connetti_db +import logging + +logger = logging.getLogger(__name__) + +CSV_RECEIVED = 0 +DATA_LOADED = 1 +DATA_ELABORATED = 2 + +def load_data(cfg: object, matrice_valori: list) -> bool : + sql_insert_RAWDATA = f''' + INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( + `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, + `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, + `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, + `BatLevelModule`,`TemperatureModule`, `RssiModule` + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s + ) + ''' + with connetti_db(cfg) as conn: + cur = conn.cursor() + try: + cur.executemany(sql_insert_RAWDATA, matrice_valori) + conn.commit() + logging.info("Data loaded.") + rc = True + except Exception as e: + conn.rollback() + logging.error(f"Error: {e}.") + rc = False + finally: + conn.close() + return rc + +def update_status(cfg: object, id: int, status: int) -> None: + with connetti_db(cfg) as conn: + cur = conn.cursor() + try: + cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status} where id = {id}') + conn.commit() + except Exception as e: + conn.rollback() + logging.error(f'Error: {e}') + +def get_matlab_cmd(cfg: object, unit: str, tool: str) -> tuple: + with connetti_db(cfg) as conn: + cur = conn.cursor() + try: + cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate + from matfuncs as m + inner join tools as t on t.matfunc = m.id + inner join units as u on u.id = t.unit_id + inner join statustools as s on t.statustool_id = s.id + where t.name = "{tool}" and u.name = "{unit}"''') + return cur.fetchone() + except Exception as e: + logging.error(f'Error: {e}') \ No newline at end of file diff --git a/utils/database/nodes_query.py b/utils/database/nodes_query.py index 0ed5724..ceaa3cd 100644 --- a/utils/database/nodes_query.py +++ b/utils/database/nodes_query.py @@ -3,7 +3,7 @@ import logging logger = logging.getLogger(__name__) -def get_nodes_type(cfg, tool, unit): +def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple: with connetti_db(cfg) as conn: cur = conn.cursor(dictionary=True) @@ -16,7 +16,7 @@ def get_nodes_type(cfg, tool, unit): WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' ORDER BY n.num; """ - logger.info(f"{unit} - {tool}: Executing query: {query}") + #logger.info(f"{unit} - {tool}: Executing query: {query}") cur.execute(query) results = cur.fetchall() logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") diff --git a/utils/ftp/file_management.py b/utils/ftp/file_management.py index 648efc9..75bf657 100644 --- a/utils/ftp/file_management.py +++ b/utils/ftp/file_management.py @@ -9,7 +9,7 @@ from utils.config.parser import extract_value logger = logging.getLogger(__name__) -def on_file_received(self, file): +def on_file_received(self: object, file: str) -> None: """Handles the event when a file is successfully received. Args: diff --git a/utils/ftp/user_admin.py b/utils/ftp/user_admin.py index 02f3c15..54eb82d 100644 --- a/utils/ftp/user_admin.py +++ b/utils/ftp/user_admin.py @@ -9,7 +9,7 @@ from utils.database.connection import connetti_db logger = logging.getLogger(__name__) -def ftp_SITE_ADDU(self, line): +def ftp_SITE_ADDU(self: object, line: str) -> None: """Adds a virtual user, creates their directory, and saves their details to the database. """ cfg = self.cfg @@ -50,7 +50,7 @@ def ftp_SITE_ADDU(self, line): self.respond(f'501 SITE ADDU failed: {e}.') print(e) -def ftp_SITE_DISU(self, line): +def ftp_SITE_DISU(self: object, line: str) -> None: """Removes a virtual user from the authorizer and marks them as deleted in the database.""" cfg = self.cfg parms = line.split() @@ -77,7 +77,7 @@ def ftp_SITE_DISU(self, line): self.respond('501 SITE DISU failed.') print(e) -def ftp_SITE_ENAU(self, line): +def ftp_SITE_ENAU(self: object, line: str) -> None: """Restores a virtual user by updating their status in the database and adding them back to the authorizer.""" cfg = self.cfg parms = line.split() @@ -116,7 +116,7 @@ def ftp_SITE_ENAU(self, line): self.respond('501 SITE ENAU failed.') print(e) -def ftp_SITE_LSTU(self, line): +def ftp_SITE_LSTU(self: object, line: str) -> None: """Lists all virtual users from the database.""" cfg = self.cfg users_list = [] diff --git a/utils/parsers/cr1000x_cr1000x.py b/utils/parsers/cr1000x_cr1000x.py index a57a82a..44b949d 100644 --- a/utils/parsers/cr1000x_cr1000x.py +++ b/utils/parsers/cr1000x_cr1000x.py @@ -1,3 +1,3 @@ -def chi_sono(unit, tool): +def main_loader(unit, tool): print(f'{__name__}: {unit} - {tool}') return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/data_preparation.py b/utils/parsers/data_preparation.py index 6e00764..27dcac4 100644 --- a/utils/parsers/data_preparation.py +++ b/utils/parsers/data_preparation.py @@ -4,6 +4,8 @@ import utils.timestamp.date_check as date_check import logging import re +from itertools import islice + logger = logging.getLogger(__name__) def get_data(cfg: object, id: int) -> tuple: @@ -92,9 +94,27 @@ def make_loc_matrix(cfg: object, id: int) -> list: matrice_valori = [] pattern = r'(?:\d{4}/\d{2}/\d{2}|\d{2}/\d{2}/\d{4}) \d{2}:\d{2}:\d{2}(;[^;]+)+' for riga in [riga for riga in righe if re.match(pattern, riga)]: - timestamp, batlevel, temperature, ain1, ain2, din1, din2, = riga.split(';') - EventDate, EventTime = timestamp.split(' ') - valori = [ain1, ain2, din1, din2] - matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + timestamp, battery_voltage, unit_temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') + event_date, event_time = timestamp.split(' ') + valori = [analog_input1, analog_input2, digital_input1, digital_input2] + matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(event_date), event_time, battery_voltage, unit_temperature] + valori + ([None] * (19 - len(valori)))) + + return matrice_valori + +def make_matrix_with_channels(cfg: object, id: int, node_channels: list) -> list: + UnitName, ToolNameID, ToolData = get_data(cfg, id) + righe = ToolData.splitlines() + matrice_valori = [] + for riga in [riga for riga in righe if ';|;' in riga]: + timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + EventDate, EventTime = timestamp.split(' ') + valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] + valori_iter = iter(valori_splitted) + valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] + + for num_nodo, valori in enumerate(valori_nodi, start=1): + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + + return matrice_valori + - return matrice_valori \ No newline at end of file diff --git a/utils/parsers/g801_iptm.py b/utils/parsers/g801_iptm.py index a57a82a..862f614 100644 --- a/utils/parsers/g801_iptm.py +++ b/utils/parsers/g801_iptm.py @@ -1,3 +1,4 @@ -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_mums import main_loader as g801_mums_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g801_loc.py b/utils/parsers/g801_loc.py index f09a209..a5933f3 100644 --- a/utils/parsers/g801_loc.py +++ b/utils/parsers/g801_loc.py @@ -1,5 +1,5 @@ #!.venv/bin/python -from utils.database.loader import load_data +from utils.database.loader_action import load_data from utils.parsers.data_preparation import make_loc_matrix import logging diff --git a/utils/parsers/g801_mums.py b/utils/parsers/g801_mums.py index f01c8f5..3afa64b 100644 --- a/utils/parsers/g801_mums.py +++ b/utils/parsers/g801_mums.py @@ -1,6 +1,6 @@ #!.venv/bin/python # Import necessary modules -from utils.database.loader import load_data +from utils.database.loader_action import load_data, update_status, DATA_LOADED from utils.parsers.data_preparation import make_matrix import logging @@ -11,4 +11,5 @@ def main_loader(cfg: object, id: int) -> None: # Create a matrix of values from the data matrice_valori = make_matrix(cfg, id) # Load the data into the database - load_data(cfg, matrice_valori) \ No newline at end of file + if load_data(cfg, matrice_valori): + update_status(cfg, id, DATA_LOADED) diff --git a/utils/parsers/g802_dsas.py b/utils/parsers/g802_dsas.py index 77dc2c8..862f614 100644 --- a/utils/parsers/g802_dsas.py +++ b/utils/parsers/g802_dsas.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_mums import main_loader as g801_mums_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_loc.py b/utils/parsers/g802_loc.py index 77dc2c8..e1eb9fc 100644 --- a/utils/parsers/g802_loc.py +++ b/utils/parsers/g802_loc.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_loc import main_loader as g801_loc_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_loc_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_modb.py b/utils/parsers/g802_modb.py index 77dc2c8..862f614 100644 --- a/utils/parsers/g802_modb.py +++ b/utils/parsers/g802_modb.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_mums import main_loader as g801_mums_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_mums.py b/utils/parsers/g802_mums.py index 77dc2c8..862f614 100644 --- a/utils/parsers/g802_mums.py +++ b/utils/parsers/g802_mums.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_mums import main_loader as g801_mums_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_mux.py b/utils/parsers/g802_mux.py index 77dc2c8..8bb681a 100644 --- a/utils/parsers/g802_mux.py +++ b/utils/parsers/g802_mux.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .g801_mux import main_loader as g801_mux_main_loader + +def main_loader(cfg: object, id: int) -> None: + return g801_mux_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/gs1_gs1.py b/utils/parsers/gs1_gs1.py index 77dc2c8..aca048c 100644 --- a/utils/parsers/gs1_gs1.py +++ b/utils/parsers/gs1_gs1.py @@ -1,5 +1,4 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file +from .tlp_tlp import main_loader as tlp_tlp_main_loader + +def main_loader(cfg: object, id: int) -> None: + return tlp_tlp_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/hortus_hortus.py b/utils/parsers/hortus_hortus.py new file mode 100644 index 0000000..6cc31a3 --- /dev/null +++ b/utils/parsers/hortus_hortus.py @@ -0,0 +1,4 @@ +from .cr1000x_cr1000x import main_loader as cr1000x_cr1000x_main_loader + +def main_loader(cfg: object, id: int) -> None: + return cr1000x_cr1000x_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/tlp_loc.py b/utils/parsers/tlp_loc.py deleted file mode 100644 index 77dc2c8..0000000 --- a/utils/parsers/tlp_loc.py +++ /dev/null @@ -1,5 +0,0 @@ -import time -def chi_sono(unit, tool): - print(f'{__name__}: {unit} - {tool}') - time.sleep(20) - return f'{__name__}: {unit} - {tool}' \ No newline at end of file