From fd5429ee0d253c6958cd5a4c86ee08d19383a433 Mon Sep 17 00:00:00 2001 From: alex Date: Thu, 1 May 2025 00:58:07 +0200 Subject: [PATCH] evol 3 --- ftpcsvreceiver.ini | 5 +- orchestrator.py | 90 ++++++++++++++----------------- utils/config/loader.py | 3 +- utils/database/loader.py | 32 +++++++++++ utils/parsers/cr1000x_cr1000x.py | 3 ++ utils/parsers/d2w_d2w.py | 3 ++ utils/parsers/data_preparation.py | 28 ++++++++++ utils/parsers/g201_g201.py | 3 ++ utils/parsers/g301_g301.py | 3 ++ utils/parsers/g801_iptm.py | 3 ++ utils/parsers/g801_loc.py | 5 ++ utils/parsers/g801_mums.py | 13 +++-- utils/parsers/g801_musa.py | 5 ++ utils/parsers/g801_mux.py | 6 ++- utils/parsers/g802_dsas.py | 5 ++ utils/parsers/g802_gd.py | 5 ++ utils/parsers/g802_loc.py | 5 ++ utils/parsers/g802_modb.py | 5 ++ utils/parsers/g802_mums.py | 5 ++ utils/parsers/g802_mux.py | 5 ++ utils/parsers/gs1_gs1.py | 5 ++ utils/parsers/tlp_loc.py | 5 ++ utils/parsers/tlp_tlp.py | 5 ++ 23 files changed, 190 insertions(+), 57 deletions(-) create mode 100644 utils/database/loader.py create mode 100644 utils/parsers/cr1000x_cr1000x.py create mode 100644 utils/parsers/d2w_d2w.py create mode 100644 utils/parsers/data_preparation.py create mode 100644 utils/parsers/g201_g201.py create mode 100644 utils/parsers/g301_g301.py create mode 100644 utils/parsers/g801_iptm.py create mode 100644 utils/parsers/g801_loc.py create mode 100644 utils/parsers/g801_musa.py create mode 100644 utils/parsers/g802_dsas.py create mode 100644 utils/parsers/g802_gd.py create mode 100644 utils/parsers/g802_loc.py create mode 100644 utils/parsers/g802_modb.py create mode 100644 utils/parsers/g802_mums.py create mode 100644 utils/parsers/g802_mux.py create mode 100644 utils/parsers/gs1_gs1.py create mode 100644 utils/parsers/tlp_loc.py create mode 100644 utils/parsers/tlp_tlp.py diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 063fdd3..392cb7b 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -19,6 +19,7 @@ [csvelab] logFilename = csvElab.log + max_threads = 5 [db] hostname = 10.211.114.173 @@ -29,14 +30,14 @@ dbSchema = public userTableName = virtusers recTableName = received - rawTableName = rawdatacor + rawTableName = RAWDATACOR [unit] Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 Names = ID[0-9]{4}|IX[0-9]{4} [tool] - Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA + Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement [csv] diff --git a/orchestrator.py b/orchestrator.py index a559d3c..60b3574 100755 --- a/orchestrator.py +++ b/orchestrator.py @@ -1,79 +1,71 @@ -#!/usr/bin/env python3 +#!.venv/bin/python import mysql.connector -import os -import sys import logging import importlib +import time +import threading -from utils.time import timestamp_fmt as ts from utils.config import loader as setting -#from unit_tool_mod import g801_mums, g801_mux +from utils.database.connection import connetti_db -def conn_db(cfg): - """Establishes a connection to the MySQL database. +logger = logging.getLogger(__name__) - Args: - cfg: The configuration object containing database connection details. - - Returns: - A MySQL database connection object. - """ +def elab_csv(cfg, threads): try: - conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport) - conn.autocommit = True - return conn - except mysql.connector.Error as e: - print(f"Error: {e}") - logging.error(f'{e}') - return None - -def elab_csv(cfg): - try: - with conn_db(cfg) as conn: + with connetti_db(cfg) as conn: cur = conn.cursor() cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() - cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') + if id: + cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 1 where id = {id}') + + module_name = f'utils.parsers.{unit_type.lower()}_{tool_type.lower()}' + modulo = importlib.import_module(module_name) + + funzione = getattr(modulo, "main_loader") + + # Chiamare la funzione + thread = threading.Thread(target = funzione, args=(cfg, id)) + threads.append(thread) + thread.start() + return True + else: + time.sleep(20) + return False + + except mysql.connector.Error as e: print(f"Error: {e}") - logging.error(f'{e}') - - module_name = f'unit_tool_mod.{unit_type.lower()}_{tool_type.lower()}' - modulo = importlib.import_module(module_name) - - funzione = getattr(modulo, "chi_sono") - - # Chiamare la funzione - risultato = funzione(unit_name, tool_name) - print(f'risultato: {risultato}') - + logger.error(f'{e}') def main(): # Load the configuration settings - cfg = setting.config() + cfg = setting.Config() try: # Configura la connessione al database PostgreSQL # Configure logging logging.basicConfig( - format="%(asctime)s %(message)s", - filename=cfg.elablog, + format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", + filename=cfg.logfilename, level=logging.INFO, ) - elab_csv(cfg) + threads = [] + while True: + while len(threads) > cfg.max_threads: + for thread in threads: + if not thread.is_alive(): + threads.remove(thread) + if elab_csv(cfg, threads): + logger.info(f"Threads in execution: {len(threads)}") + pass except KeyboardInterrupt: - logging.info( - "Info: {}.".format("Shutdown requested...exiting") - ) + logger.info("Info: Shutdown requested...exiting") - except Exception: - print( - "{} - PID {:>5} >> Error: {}.".format( - ts.timestamp("log"), os.getpid(), sys.exc_info()[1] - ) - ) + except Exception as e: + logger.info(f"Error: {e}.") if __name__ == "__main__": main() \ No newline at end of file diff --git a/utils/config/loader.py b/utils/config/loader.py index 1c30a80..199cf2b 100644 --- a/utils/config/loader.py +++ b/utils/config/loader.py @@ -25,6 +25,7 @@ class Config: # LOADER setting self.elablog = c.get("csvelab", "logFilename") + self.max_threads = c.getint("csvelab", "max_threads") # DB setting self.dbhost = c.get("db", "hostname") @@ -35,7 +36,7 @@ class Config: self.dbschema = c.get("db", "dbSchema") self.dbusertable = c.get("db", "userTableName") self.dbrectable = c.get("db", "recTableName") - self.dbdataraw = c.get("db", "rawTableName") + self.dbrawdata = c.get("db", "rawTableName") # unit setting self.units_name = [part for part in c.get("unit", "Names").split('|')] diff --git a/utils/database/loader.py b/utils/database/loader.py new file mode 100644 index 0000000..20abe76 --- /dev/null +++ b/utils/database/loader.py @@ -0,0 +1,32 @@ +#!.venv/bin/python +from utils.database.connection import connetti_db +import logging + +logger = logging.getLogger(__name__) + +def load_data(cfg, matrice_valori): + sql_insert_RAWDATA = f''' + INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} ( + `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, + `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, + `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, + `BatLevelModule`,`TemperatureModule`, `RssiModule` + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s + ) + ''' + with connetti_db(cfg) as conn: + cur = conn.cursor() + try: + cur.executemany(sql_insert_RAWDATA, matrice_valori) + conn.commit() + except Exception as e: + conn.rollback() + print(f'Error: {e}') + finally: + cur.close() + conn.close() \ No newline at end of file diff --git a/utils/parsers/cr1000x_cr1000x.py b/utils/parsers/cr1000x_cr1000x.py new file mode 100644 index 0000000..a57a82a --- /dev/null +++ b/utils/parsers/cr1000x_cr1000x.py @@ -0,0 +1,3 @@ +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/d2w_d2w.py b/utils/parsers/d2w_d2w.py new file mode 100644 index 0000000..a57a82a --- /dev/null +++ b/utils/parsers/d2w_d2w.py @@ -0,0 +1,3 @@ +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/data_preparation.py b/utils/parsers/data_preparation.py new file mode 100644 index 0000000..48ca652 --- /dev/null +++ b/utils/parsers/data_preparation.py @@ -0,0 +1,28 @@ +#!.venv/bin/python +from utils.database.connection import connetti_db +import utils.timestamp.date_check as date_check +import logging + +logger = logging.getLogger(__name__) + +def get_data(cfg, id): + with connetti_db(cfg) as conn: + cur = conn.cursor() + cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}') + unit_name, tool_name, tool_data = cur.fetchone() + cur.close() + conn.close() + return unit_name, tool_name, tool_data + +def make_matrix(cfg, id): + UnitName, ToolNameID, ToolData = get_data(cfg, id) + righe = ToolData.splitlines() + matrice_valori = [] + for riga in [riga for riga in righe if ';|;' in riga]: + timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) + EventDate, EventTime = timestamp.split(' ') + valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' + for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): + valori = valori_nodo.split(';') + matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + return matrice_valori \ No newline at end of file diff --git a/utils/parsers/g201_g201.py b/utils/parsers/g201_g201.py new file mode 100644 index 0000000..a57a82a --- /dev/null +++ b/utils/parsers/g201_g201.py @@ -0,0 +1,3 @@ +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g301_g301.py b/utils/parsers/g301_g301.py new file mode 100644 index 0000000..a57a82a --- /dev/null +++ b/utils/parsers/g301_g301.py @@ -0,0 +1,3 @@ +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g801_iptm.py b/utils/parsers/g801_iptm.py new file mode 100644 index 0000000..a57a82a --- /dev/null +++ b/utils/parsers/g801_iptm.py @@ -0,0 +1,3 @@ +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g801_loc.py b/utils/parsers/g801_loc.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g801_loc.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g801_mums.py b/utils/parsers/g801_mums.py index 2d790e4..4625994 100644 --- a/utils/parsers/g801_mums.py +++ b/utils/parsers/g801_mums.py @@ -1,3 +1,10 @@ -def chi_sono(unit, tool): - print(f'g801_mums: {unit} - {tool}') - return f'g801_mums: {unit} - {tool}' \ No newline at end of file +#!.venv/bin/python +from utils.database.loader import load_data +from utils.parsers.data_preparation import make_matrix +import logging + +logger = logging.getLogger(__name__) + +def main_loader(cfg, id): + matrice_valori = make_matrix(cfg, id) + load_data(cfg, matrice_valori) \ No newline at end of file diff --git a/utils/parsers/g801_musa.py b/utils/parsers/g801_musa.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g801_musa.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g801_mux.py b/utils/parsers/g801_mux.py index 59f5adc..77dc2c8 100644 --- a/utils/parsers/g801_mux.py +++ b/utils/parsers/g801_mux.py @@ -1,3 +1,5 @@ +import time def chi_sono(unit, tool): - print(f'g801_mux: {unit} - {tool}') - return f'g801_mux: {unit} - {tool}' \ No newline at end of file + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_dsas.py b/utils/parsers/g802_dsas.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_dsas.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_gd.py b/utils/parsers/g802_gd.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_gd.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_loc.py b/utils/parsers/g802_loc.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_loc.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_modb.py b/utils/parsers/g802_modb.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_modb.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_mums.py b/utils/parsers/g802_mums.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_mums.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/g802_mux.py b/utils/parsers/g802_mux.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/g802_mux.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/gs1_gs1.py b/utils/parsers/gs1_gs1.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/gs1_gs1.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/tlp_loc.py b/utils/parsers/tlp_loc.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/tlp_loc.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file diff --git a/utils/parsers/tlp_tlp.py b/utils/parsers/tlp_tlp.py new file mode 100644 index 0000000..77dc2c8 --- /dev/null +++ b/utils/parsers/tlp_tlp.py @@ -0,0 +1,5 @@ +import time +def chi_sono(unit, tool): + print(f'{__name__}: {unit} - {tool}') + time.sleep(20) + return f'{__name__}: {unit} - {tool}' \ No newline at end of file