diff --git a/elab_orchestrator.py b/elab_orchestrator.py index 7b7c9fe..cf24112 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -9,7 +9,7 @@ import asyncio import subprocess # Import custom modules for configuration and database connection -from utils.config import loader as setting +from utils.config import loader_ftp_csv as setting from utils.database.connection import connetti_db from utils.database.loader_action import DATA_LOADED, get_matlab_cmd diff --git a/env/db.ini b/env/db.ini new file mode 100644 index 0000000..78a3531 --- /dev/null +++ b/env/db.ini @@ -0,0 +1,15 @@ +# to generete adminuser password hash: +# python3 -c 'from hashlib import sha256;print(sha256("????password???".encode("UTF-8")).hexdigest())' + +[db] + hostname = 10.211.114.173 + port = 3306 + user = root + password = batt1l0 + dbName = ase_lar + +[tables] + userTableName = virtusers + recTableName = received + rawTableName = RAWDATACOR + nodesTableName = nodes diff --git a/env/elab.ini b/env/elab.ini new file mode 100644 index 0000000..e69de29 diff --git a/ftp_csv_receiver.ini b/env/ftp.ini similarity index 74% rename from ftp_csv_receiver.ini rename to env/ftp.ini index d0b9a43..a8574e9 100644 --- a/ftp_csv_receiver.ini +++ b/env/ftp.ini @@ -3,7 +3,6 @@ [ftpserver] firstPort = 40000 - logFilename = ./ftppylog.log proxyAddr = 0.0.0.0 portRangeWidth = 500 virtpath = /home/alex/aseftp/ @@ -17,20 +16,8 @@ [csvfs] path = /home/alex/aseftp/csvfs/ -[csvelab] - logFilename = csvElab.log - max_threads = 10 - -[db] - hostname = 10.211.114.173 - port = 3306 - user = root - password = batt1l0 - dbName = ase_lar - userTableName = virtusers - recTableName = received - rawTableName = RAWDATACOR - nodesTableName = nodes +[logging] + logFilename = ./ftp_csv_rec.log [unit] Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1|HORTUS diff --git a/env/load.ini b/env/load.ini new file mode 100644 index 0000000..39ce66c --- /dev/null +++ b/env/load.ini @@ -0,0 +1,5 @@ +[logging] + logFilename = ./load_raw_data.log + +[threads] + max_num = 10 \ No newline at end of file diff --git a/ftp_csv_receiver.py b/ftp_csv_receiver.py index 6241eeb..4937c77 100755 --- a/ftp_csv_receiver.py +++ b/ftp_csv_receiver.py @@ -6,7 +6,7 @@ import logging from hashlib import sha256 from pathlib import Path -from utils.config import loader +from utils.config import loader_ftp_csv from utils.database.connection import connetti_db from utils.ftp import user_admin, file_management @@ -114,7 +114,7 @@ class ASEHandler(FTPHandler): def main(): """Main function to start the FTP server.""" # Load the configuration settings - cfg = loader.Config() + cfg = loader_ftp_csv.Config() try: # Initialize the authorizer and handler diff --git a/load_orchestrator.py b/load_orchestrator.py index 5fa2956..1323163 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -5,21 +5,21 @@ import mysql.connector import logging import importlib import asyncio +import os # Import custom modules for configuration and database connection -from utils.config import loader as setting +from utils.config import loader_load_data as setting from utils.database.connection import connetti_db from utils.database.loader_action import CSV_RECEIVED # Initialize the logger for this module -logger = logging.getLogger(__name__) +logger = logging.getLogger() # Delay tra un processamento CSV e il successivo (in secondi) CSV_PROCESSING_DELAY = 0.1 # Tempo di attesa se non ci sono record da elaborare NO_RECORD_SLEEP = 20 - async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: """ Worker asyncrono che preleva lavori dalla coda e li esegue. @@ -29,6 +29,7 @@ async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: queue (asyncio.Queue): Coda da cui prendere i lavori. cfg (object): Configurazione caricata. """ + debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.info(f"Worker {worker_id} - Avviato") while True: @@ -48,12 +49,12 @@ async def worker(worker_id: int, queue: asyncio.Queue, cfg: object) -> None: await asyncio.sleep(CSV_PROCESSING_DELAY) else: logger.debug(f"Worker {worker_id} - Elaborazione completata correttamente") - await asyncio.sleep(CSV_PROCESSING_DELAY) + await asyncio.sleep(CSV_PROCESSING_DELAY*worker_id) # Segnala che il lavoro è completato queue.task_done() except Exception as e: - logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=True) + logger.error(f"Worker {worker_id} - Errore durante l'esecuzione: {e}", exc_info=debug_mode) queue.task_done() @@ -67,6 +68,8 @@ async def load_csv(cfg: object) -> tuple: Returns: bool: True se è stato trovato ed elaborato un record, False altrimenti. """ + + debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) logger.debug("Inizio ricerca nuovo CSV da elaborare") try: @@ -107,14 +110,14 @@ async def load_csv(cfg: object) -> tuple: logger.info(f"Elaborazione completata per ID={id}") return True, True except (ImportError, AttributeError) as e: - logger.error(f"Errore nel caricamento del modulo o della funzione: {e}", exc_info=True) + logger.error(f"Errore nel caricamento del modulo o della funzione: {e}", exc_info=debug_mode) return True, False else: logger.debug("Nessun record disponibile per l'elaborazione") return False, False except mysql.connector.Error as e: - logger.error(f"Errore di database: {e}", exc_info=True) + logger.error(f"Errore di database: {e}", exc_info=debug_mode) return False, False @@ -127,15 +130,18 @@ async def main(): try: # Configura il logging globale + log_level = os.getenv("LOG_LEVEL", "INFO").upper() + debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) + logging.basicConfig( format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ", filename=cfg.logfilename, - level=logging.INFO, + level=log_level, ) logger.info("Logging configurato correttamente") # Crea una coda di lavoro illimitata - queue = asyncio.Queue(maxsize=cfg.queue_maxsize or 10) + queue = asyncio.Queue(maxsize=cfg.max_threads * 2 or 20) logger.debug("Coda di lavoro creata") # Numero massimo di worker concorrenti @@ -169,12 +175,12 @@ async def main(): for task in workers: task.cancel() - await asyncio.gather(*workers, return_exceptions=True) + await asyncio.gather(*workers, return_exceptions=debug_mode) logger.info("Info: Tutti i task terminati. Uscita.") except Exception as e: - logger.error(f"Errore principale: {e}", exc_info=True) + logger.error(f"Errore principale: {e}", exc_info=debug_mode) if __name__ == "__main__": diff --git a/utils/config/loader.py b/utils/config/loader_ftp_csv.py similarity index 72% rename from utils/config/loader.py rename to utils/config/loader_ftp_csv.py index 9bac85e..912d5ac 100644 --- a/utils/config/loader.py +++ b/utils/config/loader_ftp_csv.py @@ -5,11 +5,12 @@ from configparser import ConfigParser class Config: def __init__(self): + c = ConfigParser() - c.read(["/etc/aseftp/ftp_csv_receiver.ini", "./ftp_csv_receiver.ini"]) + c.read(["env/ftp.ini", "env/db.ini"]) + # FTP setting self.firstport = c.getint("ftpserver", "firstPort") - self.logfilename = c.get("ftpserver", "logFilename") self.proxyaddr = c.get("ftpserver", "proxyAddr") self.portrangewidth = c.getint("ftpserver", "portRangeWidth") self.virtpath = c.get("ftpserver", "virtpath") @@ -22,9 +23,8 @@ class Config: # CSV FILE setting self.csvfs = c.get("csvfs", "path") - # LOADER setting - self.elablog = c.get("csvelab", "logFilename") - self.max_threads = c.getint("csvelab", "max_threads") + # LOG setting + self.logfilename = c.get("logging", "logFilename") # DB setting self.dbhost = c.get("db", "hostname") @@ -32,12 +32,13 @@ class Config: self.dbuser = c.get("db", "user") self.dbpass = c.get("db", "password") self.dbname = c.get("db", "dbName") - #self.dbschema = c.get("db", "dbSchema") - self.dbusertable = c.get("db", "userTableName") - self.dbrectable = c.get("db", "recTableName") - self.dbrawdata = c.get("db", "rawTableName") - self.dbrawdata = c.get("db", "rawTableName") - self.dbnodes = c.get("db", "nodesTableName") + + # Tables + self.dbusertable = c.get("tables", "userTableName") + self.dbrectable = c.get("tables", "recTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbnodes = c.get("tables", "nodesTableName") # unit setting self.units_name = [part for part in c.get("unit", "Names").split('|')] diff --git a/utils/config/loader_load_data.py b/utils/config/loader_load_data.py new file mode 100644 index 0000000..69f4d68 --- /dev/null +++ b/utils/config/loader_load_data.py @@ -0,0 +1,31 @@ +"""set configurations + +""" +from configparser import ConfigParser + +class Config: + def __init__(self): + + c = ConfigParser() + c.read(["env/load.ini", "env/db.ini"]) + + # LOG setting + self.logfilename = c.get("logging", "logFilename") + + # Worker setting + self.max_threads = c.getint("threads", "max_num") + + # DB setting + self.dbhost = c.get("db", "hostname") + self.dbport = c.getint("db", "port") + self.dbuser = c.get("db", "user") + self.dbpass = c.get("db", "password") + self.dbname = c.get("db", "dbName") + + # Tables + self.dbusertable = c.get("tables", "userTableName") + self.dbrectable = c.get("tables", "recTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbnodes = c.get("tables", "nodesTableName") + diff --git a/utils/config/loader_matlab_elab.py b/utils/config/loader_matlab_elab.py new file mode 100644 index 0000000..74ef69c --- /dev/null +++ b/utils/config/loader_matlab_elab.py @@ -0,0 +1,30 @@ +"""set configurations + +""" +from configparser import ConfigParser + +class Config: + def __init__(self): + + c = ConfigParser() + c.read(["env/elab.ini", "env/db.ini"]) + + # LOG setting + self.logfilename = c.get("logging", "logFilename") + + # Worker setting + self.max_threads = c.getint("threads", "max_num") + + # DB setting + self.dbhost = c.get("db", "hostname") + self.dbport = c.getint("db", "port") + self.dbuser = c.get("db", "user") + self.dbpass = c.get("db", "password") + self.dbname = c.get("db", "dbName") + + # Tables + self.dbusertable = c.get("tables", "userTableName") + self.dbrectable = c.get("tables", "recTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbrawdata = c.get("tables", "rawTableName") + self.dbnodes = c.get("tables", "nodesTableName") diff --git a/utils/csv/__init__.py b/utils/csv/__init__.py new file mode 100644 index 0000000..645f1c4 --- /dev/null +++ b/utils/csv/__init__.py @@ -0,0 +1 @@ +"""Parser delle centraline""" diff --git a/utils/parsers/data_preparation.py b/utils/csv/data_preparation.py similarity index 100% rename from utils/parsers/data_preparation.py rename to utils/csv/data_preparation.py diff --git a/utils/config/parser.py b/utils/csv/parser.py similarity index 100% rename from utils/config/parser.py rename to utils/csv/parser.py diff --git a/utils/ftp/file_management.py b/utils/ftp/file_management.py index 75bf657..ec5584a 100644 --- a/utils/ftp/file_management.py +++ b/utils/ftp/file_management.py @@ -5,7 +5,7 @@ import mysql.connector from utils.database.connection import connetti_db -from utils.config.parser import extract_value +from utils.csv.parser import extract_value logger = logging.getLogger(__name__) diff --git a/utils/parsers/cr1000x_cr1000x.py b/utils/parsers/cr1000x_cr1000x.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/cr1000x_cr1000x.py +++ b/utils/parsers/cr1000x_cr1000x.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/d2w_d2w.py b/utils/parsers/d2w_d2w.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/d2w_d2w.py +++ b/utils/parsers/d2w_d2w.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g201_g201.py b/utils/parsers/g201_g201.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/g201_g201.py +++ b/utils/parsers/g201_g201.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g301_g301.py b/utils/parsers/g301_g301.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/g301_g301.py +++ b/utils/parsers/g301_g301.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g801_iptm.py b/utils/parsers/g801_iptm.py index 862f614..83df75e 100644 --- a/utils/parsers/g801_iptm.py +++ b/utils/parsers/g801_iptm.py @@ -1,4 +1,4 @@ from .g801_mums import main_loader as g801_mums_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g801_loc.py b/utils/parsers/g801_loc.py index a5933f3..15eef76 100644 --- a/utils/parsers/g801_loc.py +++ b/utils/parsers/g801_loc.py @@ -1,11 +1,11 @@ #!.venv/bin/python from utils.database.loader_action import load_data -from utils.parsers.data_preparation import make_loc_matrix +from utils.csv.data_preparation import make_loc_matrix import logging logger = logging.getLogger(__name__) -def main_loader(cfg, id): +async def main_loader(cfg, id): matrice_valori = make_loc_matrix(cfg, id) load_data(cfg, matrice_valori) diff --git a/utils/parsers/g801_mums.py b/utils/parsers/g801_mums.py index 0c96fcb..dab129b 100644 --- a/utils/parsers/g801_mums.py +++ b/utils/parsers/g801_mums.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g801_musa.py b/utils/parsers/g801_musa.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/g801_musa.py +++ b/utils/parsers/g801_musa.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g801_mux.py b/utils/parsers/g801_mux.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/g801_mux.py +++ b/utils/parsers/g801_mux.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g802_dsas.py b/utils/parsers/g802_dsas.py index 862f614..83df75e 100644 --- a/utils/parsers/g802_dsas.py +++ b/utils/parsers/g802_dsas.py @@ -1,4 +1,4 @@ from .g801_mums import main_loader as g801_mums_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_gd.py b/utils/parsers/g802_gd.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/g802_gd.py +++ b/utils/parsers/g802_gd.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__) diff --git a/utils/parsers/g802_loc.py b/utils/parsers/g802_loc.py index e1eb9fc..f00b14f 100644 --- a/utils/parsers/g802_loc.py +++ b/utils/parsers/g802_loc.py @@ -1,4 +1,4 @@ from .g801_loc import main_loader as g801_loc_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_loc_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_modb.py b/utils/parsers/g802_modb.py index 862f614..83df75e 100644 --- a/utils/parsers/g802_modb.py +++ b/utils/parsers/g802_modb.py @@ -1,4 +1,4 @@ from .g801_mums import main_loader as g801_mums_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_mums.py b/utils/parsers/g802_mums.py index 862f614..83df75e 100644 --- a/utils/parsers/g802_mums.py +++ b/utils/parsers/g802_mums.py @@ -1,4 +1,4 @@ from .g801_mums import main_loader as g801_mums_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_mums_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/g802_mux.py b/utils/parsers/g802_mux.py index 8bb681a..ba50936 100644 --- a/utils/parsers/g802_mux.py +++ b/utils/parsers/g802_mux.py @@ -1,4 +1,4 @@ from .g801_mux import main_loader as g801_mux_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return g801_mux_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/gs1_gs1.py b/utils/parsers/gs1_gs1.py index aca048c..fd58c48 100644 --- a/utils/parsers/gs1_gs1.py +++ b/utils/parsers/gs1_gs1.py @@ -1,4 +1,4 @@ from .tlp_tlp import main_loader as tlp_tlp_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return tlp_tlp_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/hortus_hortus.py b/utils/parsers/hortus_hortus.py index 6cc31a3..2501f94 100644 --- a/utils/parsers/hortus_hortus.py +++ b/utils/parsers/hortus_hortus.py @@ -1,4 +1,4 @@ from .cr1000x_cr1000x import main_loader as cr1000x_cr1000x_main_loader -def main_loader(cfg: object, id: int) -> None: +async def main_loader(cfg: object, id: int) -> None: return cr1000x_cr1000x_main_loader(cfg, id) \ No newline at end of file diff --git a/utils/parsers/tlp_tlp.py b/utils/parsers/tlp_tlp.py index 5ec47c0..be949f6 100644 --- a/utils/parsers/tlp_tlp.py +++ b/utils/parsers/tlp_tlp.py @@ -1,7 +1,7 @@ #!.venv/bin/python # Import necessary modules from utils.database.loader_action import load_data, update_status, DATA_LOADED -from utils.parsers.data_preparation import make_matrix +from utils.csv.data_preparation import make_matrix import logging logger = logging.getLogger(__name__)