From b1ce9061b18e7b0aada5ed31997a112770f86bd9 Mon Sep 17 00:00:00 2001 From: alex Date: Fri, 11 Jul 2025 22:06:45 +0200 Subject: [PATCH] add comment --- elab_orchestrator.py | 50 ++- env/elab.ini | 6 +- load_orchestrator.py | 88 +++-- test/test_ftp_csv_receicer.py | 546 -------------------------------- utils/database/connection.py | 11 +- utils/database/loader_action.py | 73 ++++- utils/database/matlab_query.py | 15 + utils/database/nodes_query.py | 14 +- 8 files changed, 212 insertions(+), 591 deletions(-) delete mode 100644 test/test_ftp_csv_receicer.py diff --git a/elab_orchestrator.py b/elab_orchestrator.py index 45477ac..df64276 100755 --- a/elab_orchestrator.py +++ b/elab_orchestrator.py @@ -6,7 +6,6 @@ import asyncio import os import aiomysql import contextvars -import subprocess # Import custom modules for configuration and database connection @@ -21,7 +20,16 @@ worker_context = contextvars.ContextVar('worker_id', default='00') # Formatter personalizzato che include il worker_id class WorkerFormatter(logging.Formatter): + """Formatter personalizzato che include l'ID del worker nei log.""" def format(self, record): + """Formatta il record di log includendo l'ID del worker. + + Args: + record: Il record di log da formattare. + + Returns: + La stringa formattata del record di log. + """ record.worker_id = worker_context.get() return super().format(record) @@ -34,6 +42,17 @@ ELAB_PROCESSING_DELAY = 0.2 NO_RECORD_SLEEP = 60 async def worker(worker_id: int, cfg: object, pool) -> None: + """Esegue il ciclo di lavoro per l'elaborazione dei dati caricati. + + Il worker preleva un record dal database che indica dati pronti per + l'elaborazione, esegue un comando Matlab associato e attende + prima di iniziare un nuovo ciclo. + + Args: + worker_id (int): L'ID univoco del worker. + cfg (object): L'oggetto di configurazione. + pool: Il pool di connessioni al database. + """ # Imposta il context per questo worker worker_context.set(f"W{worker_id}") @@ -54,14 +73,19 @@ async def worker(worker_id: int, cfg: object, pool) -> None: # matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt' - success = await subprocess.run(matlab_cmd, - cwd=cfg.matlab_func_path, - capture_output=True, - text=True, - check=True) + proc = await asyncio.create_subprocess_shell( + matlab_cmd, + cwd=cfg.matlab_func_path, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) - if not success: + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: logger.error("Errore durante l'elaborazione") + logger.error(stderr.decode().strip()) + logger.info(stdout.decode().strip()) await asyncio.sleep(ELAB_PROCESSING_DELAY) else: logger.info("Nessun record disponibile") @@ -73,7 +97,15 @@ async def worker(worker_id: int, cfg: object, pool) -> None: async def main(): - """Main function: avvia i worker e gestisce il ciclo principale.""" + """Funzione principale che inizializza e avvia il sistema di elaborazione. + + Questa funzione si occupa di: + - Caricare la configurazione. + - Impostare il logging. + - Creare un pool di connessioni al database. + - Avviare i worker concorrenti per l'elaborazione. + - Gestire l'arresto controllato del sistema. + """ logger.info("Avvio del sistema...") cfg = setting.Config() @@ -106,7 +138,7 @@ async def main(): user=cfg.dbuser, password=cfg.dbpass, db=cfg.dbname, - minsize=4, + minsize=cfg.max_threads, maxsize=cfg.max_threads*4, pool_recycle=3600 ) diff --git a/env/elab.ini b/env/elab.ini index 5a6ccb6..fb04fd1 100644 --- a/env/elab.ini +++ b/env/elab.ini @@ -5,8 +5,10 @@ max_num = 10 [matlab] - runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 - func_path = /usr/local/matlab_func/ + #runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 + #func_path = /usr/local/matlab_func/ + runtime = /home/alex/matlab_sym/ + func_path = /home/alex/matlab_sym/ timeout = 1800 error = "" error_path = /tmp/ diff --git a/load_orchestrator.py b/load_orchestrator.py index 52e7997..32145eb 100755 --- a/load_orchestrator.py +++ b/load_orchestrator.py @@ -15,14 +15,26 @@ from utils.database import CSV_RECEIVED from utils.csv.loaders import get_next_csv_atomic # Crea una context variable per identificare il worker -worker_context = contextvars.ContextVar('worker_id', default='00') +worker_context = contextvars.ContextVar("worker_id", default="00") + # Formatter personalizzato che include il worker_id class WorkerFormatter(logging.Formatter): + """Formatter personalizzato per i log che include l'ID del worker.""" + def format(self, record): + """Formatta il record di log includendo l'ID del worker. + + Args: + record: Il record di log da formattare. + + Returns: + La stringa formattata del record di log. + """ record.worker_id = worker_context.get() return super().format(record) + # Initialize the logger for this module logger = logging.getLogger() @@ -31,11 +43,22 @@ CSV_PROCESSING_DELAY = 0.2 # Tempo di attesa se non ci sono record da elaborare NO_RECORD_SLEEP = 60 + async def worker(worker_id: int, cfg: object, pool) -> None: + """Esegue il ciclo di lavoro per l'elaborazione dei file CSV. + + Il worker preleva un record CSV dal database, ne elabora il contenuto + e attende prima di iniziare un nuovo ciclo. + + Args: + worker_id (int): L'ID univoco del worker. + cfg (object): L'oggetto di configurazione. + pool: Il pool di connessioni al database. + """ # Imposta il context per questo worker worker_context.set(f"W{worker_id}") - debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.info("Avviato") while True: @@ -57,18 +80,35 @@ async def worker(worker_id: int, cfg: object, pool) -> None: logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) await asyncio.sleep(1) + async def load_csv(record: tuple, cfg: object, pool) -> bool: - debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) + """Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato. + + Args: + record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name). + cfg: L'oggetto di configurazione contenente i parametri del sistema. + pool: Il pool di connessioni al database. + + Returns: + True se l'elaborazione del CSV è avvenuta con successo, False altrimenti. + """ + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG logger.debug("Inizio ricerca nuovo CSV da elaborare") - id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] - logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}') + id, unit_type, tool_type, unit_name, tool_name = [ + x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record + ] + logger.info( + f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}" + ) # Costruisce il nome del modulo da caricare dinamicamente - module_names = [f'utils.parsers.by_name.{unit_name}_{tool_name}', - f'utils.parsers.by_name.{unit_name}_{tool_type}', - f'utils.parsers.by_name.{unit_name}_all', - f'utils.parsers.by_type.{unit_type}_{tool_type}'] + module_names = [ + f"utils.parsers.by_name.{unit_name}_{tool_name}", + f"utils.parsers.by_name.{unit_name}_{tool_type}", + f"utils.parsers.by_name.{unit_name}_all", + f"utils.parsers.by_type.{unit_type}_{tool_type}", + ] modulo = None for module_name in module_names: try: @@ -77,7 +117,10 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool: logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}") break except (ImportError, AttributeError) as e: - logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) + logger.debug( + f"Modulo {module_name} non presente o non valido. {e}", + exc_info=debug_mode, + ) if not modulo: logger.error(f"Nessun modulo trovato {module_names}") @@ -92,8 +135,17 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool: logger.info(f"Elaborazione completata per ID={id}") return True + async def main(): - """Main function: avvia i worker e gestisce il ciclo principale.""" + """Funzione principale che inizializza e avvia il sistema. + + Questa funzione si occupa di: + - Caricare la configurazione. + - Impostare il logging. + - Creare un pool di connessioni al database. + - Avviare i worker concorrenti per l'elaborazione dei CSV. + - Gestire l'arresto controllato del sistema. + """ logger.info("Avvio del sistema...") cfg = setting.Config() @@ -102,7 +154,7 @@ async def main(): try: # Configura il logging globale log_level = os.getenv("LOG_LEVEL", "INFO").upper() - debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) + debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG # Configura il logging con il formatter personalizzato handler = logging.FileHandler(cfg.logfilename) @@ -126,15 +178,14 @@ async def main(): user=cfg.dbuser, password=cfg.dbpass, db=cfg.dbname, - minsize=4, - maxsize=cfg.max_threads*4, - pool_recycle=3600 + minsize=cfg.max_threads, + maxsize=cfg.max_threads * 4, + pool_recycle=3600, ) # Avvia i worker workers = [ - asyncio.create_task(worker(i, cfg, pool)) - for i in range(cfg.max_threads) + asyncio.create_task(worker(i, cfg, pool)) for i in range(cfg.max_threads) ] logger.info("Sistema avviato correttamente. In attesa di nuovi task...") @@ -151,5 +202,6 @@ async def main(): except Exception as e: logger.error(f"Errore principale: {e}", exc_info=debug_mode) + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/test/test_ftp_csv_receicer.py b/test/test_ftp_csv_receicer.py deleted file mode 100644 index 26cb362..0000000 --- a/test/test_ftp_csv_receicer.py +++ /dev/null @@ -1,546 +0,0 @@ -import unittest -import os -import sys -from unittest.mock import patch, MagicMock, mock_open, call, ANY -from hashlib import sha256 -from pathlib import Path -from types import SimpleNamespace # Used to create mock config objects - -# Add the parent directory to sys.path to allow importing FtpCsvReceiver -# Adjust this path if your test file is located differently -script_dir = os.path.dirname(os.path.abspath(__file__)) -parent_dir = os.path.dirname(script_dir) -# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this -# If it's in the parent directory (like /home/alex/devel/ASE/), use this: -sys.path.insert(0, parent_dir) - -# Now import the components to test -# We need to import AFTER modifying sys.path if necessary -# Also, mock dependencies BEFORE importing the module that uses them -# Mock mysql.connector BEFORE importing FtpCsvReceiver -mock_mysql_connector = MagicMock() -sys.modules['mysql.connector'] = mock_mysql_connector - -# Mock the custom utils modules as well if they aren't available in the test environment -mock_utils_time = MagicMock() -mock_utils_config = MagicMock() -sys.modules['utils.time'] = mock_utils_time -sys.modules['utils.config'] = mock_utils_config -# Mock the setting.config() call specifically -mock_config_instance = MagicMock() -mock_utils_config.set_config.config.return_value = mock_config_instance - -# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough -# sys.modules['pyftpdlib.handlers'] = MagicMock() -# sys.modules['pyftpdlib.authorizers'] = MagicMock() -# sys.modules['pyftpdlib.servers'] = MagicMock() - -# Import the module AFTER mocking dependencies -import ftp_csv_receiver -from ftp_csv_receiver import ( - extract_value, - DummySha256Authorizer, - ASEHandler, - conn_db, # Import even though we mock mysql.connector -) - -# --- Test Configuration Setup --- -def create_mock_cfg(): - """Creates a mock configuration object for testing.""" - cfg = SimpleNamespace() - cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT'] - cfg.dbhost = 'mockhost' - cfg.dbport = 3306 - cfg.dbuser = 'mockuser' - cfg.dbpass = 'mockpass' - cfg.dbname = 'mockdb' - cfg.dbusertable = 'mock_virtusers' - cfg.dbrectable = 'mock_received' - cfg.virtpath = '/fake/ftp/root/' - cfg.defperm = 'elmw' - cfg.fileext = ['.CSV', '.TXT'] - # Add patterns as lists of strings - cfg.units_name = [r'ID\d{4}', r'IX\d{4}'] - cfg.units_type = [r'G801', r'G201'] - cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}'] - cfg.tools_type = [r'MUX', r'MUMS'] - # Add other necessary config values - cfg.logfilename = 'test_ftp.log' - cfg.proxyaddr = '0.0.0.0' - cfg.firstport = 40000 - cfg.portrangewidth = 10 - return cfg - -# --- Test Cases --- - -class TestExtractValue(unittest.TestCase): - - def test_extract_from_primary(self): - patterns = [r'ID(\d+)'] - primary = "File_ID1234_data.csv" - secondary = "Some other text" - self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") - - def test_extract_from_secondary(self): - patterns = [r'Type(A|B)'] - primary = "Filename_without_type.txt" - secondary = "Log data: TypeB found" - self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") - - def test_no_match(self): - patterns = [r'XYZ\d+'] - primary = "File_ID1234_data.csv" - secondary = "Log data: TypeB found" - self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound") - - def test_case_insensitive(self): - patterns = [r'id(\d+)'] - primary = "File_ID1234_data.csv" - secondary = "Some other text" - self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case - - def test_multiple_patterns(self): - patterns = [r'Type(A|B)', r'ID(\d+)'] - primary = "File_ID1234_data.csv" - secondary = "Log data: TypeB found" - # Should match the first pattern found in the primary source - self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") - - def test_multiple_patterns_secondary_match(self): - patterns = [r'XYZ\d+', r'Type(A|B)'] - primary = "File_ID1234_data.csv" - secondary = "Log data: TypeB found" - # Should match the second pattern in the secondary source - self.assertEqual(extract_value(patterns, primary, secondary), "TypeB") - - -class TestDummySha256Authorizer(unittest.TestCase): - - def setUp(self): - self.mock_cfg = create_mock_cfg() - # Mock the database connection and cursor - self.mock_conn = MagicMock() - self.mock_cursor = MagicMock() - mock_mysql_connector.connect.return_value = self.mock_conn - self.mock_conn.cursor.return_value = self.mock_cursor - - @patch('FtpCsvReceiver.Path') # Mock Path object - def test_init_loads_users(self, mock_path_constructor): - # Mock Path instance methods - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - - # Simulate database result - db_users = [ - ('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'), - ('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'), - ] - self.mock_cursor.fetchall.return_value = db_users - - authorizer = DummySha256Authorizer(self.mock_cfg) - - # Verify DB connection - mock_mysql_connector.connect.assert_called_once_with( - user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass, - host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport - ) - # Verify query - self.mock_cursor.execute.assert_called_once_with( - f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL' - ) - # Verify admin user added - self.assertIn('admin', authorizer.user_table) - self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1]) - # Verify DB users added - self.assertIn('user1', authorizer.user_table) - self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1]) - self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2]) - self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3]) - self.assertIn('user2', authorizer.user_table) - # Verify directories were "created" - expected_path_calls = [ - call(self.mock_cfg.virtpath + 'user1'), - call(self.mock_cfg.virtpath + 'user2'), - ] - mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True) - self.assertEqual(mock_path_instance.mkdir.call_count, 2) - mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True) - - @patch('FtpCsvReceiver.Path') - def test_init_mkdir_exception(self, mock_path_constructor): - # Simulate database result - db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')] - self.mock_cursor.fetchall.return_value = db_users - - # Mock Path to raise an exception - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - mock_path_instance.mkdir.side_effect = OSError("Permission denied") - - # We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here) - # For a unit test, we just check that the user is still added - authorizer = DummySha256Authorizer(self.mock_cfg) - self.assertIn('user1', authorizer.user_table) - mock_path_instance.mkdir.assert_called_once() - - - def test_validate_authentication_success(self): - self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity - authorizer = DummySha256Authorizer(self.mock_cfg) - # Test admin user - authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method - - def test_validate_authentication_wrong_password(self): - self.mock_cursor.fetchall.return_value = [] - authorizer = DummySha256Authorizer(self.mock_cfg) - with self.assertRaises(ftp_csv_receiver.AuthenticationFailed): - authorizer.validate_authentication('admin', 'wrongpass', None) - - def test_validate_authentication_unknown_user(self): - self.mock_cursor.fetchall.return_value = [] - authorizer = DummySha256Authorizer(self.mock_cfg) - with self.assertRaises(ftp_csv_receiver.AuthenticationFailed): - authorizer.validate_authentication('unknown', 'somepass', None) - - -class TestASEHandler(unittest.TestCase): - - def setUp(self): - self.mock_cfg = create_mock_cfg() - self.mock_conn = MagicMock() # Mock FTP connection object - self.mock_server = MagicMock() # Mock FTP server object - self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer - - # Instantiate the handler - # We need to manually set cfg and authorizer as done in main() - self.handler = ASEHandler(self.mock_conn, self.mock_server) - self.handler.cfg = self.mock_cfg - self.handler.authorizer = self.mock_authorizer - self.handler.respond = MagicMock() # Mock the respond method - self.handler.push = MagicMock() # Mock the push method - - # Mock database for handler methods - self.mock_db_conn = MagicMock() - self.mock_db_cursor = MagicMock() - # Patch conn_db globally for this test class - self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn) - self.mock_conn_db = self.patcher_conn_db.start() - self.mock_db_conn.cursor.return_value = self.mock_db_cursor - - # Mock logging - self.patcher_logging = patch('FtpCsvReceiver.logging') - self.mock_logging = self.patcher_logging.start() - - - def tearDown(self): - # Stop the patchers - self.patcher_conn_db.stop() - self.patcher_logging.stop() - # Reset mocks if needed between tests (though setUp does this) - mock_mysql_connector.reset_mock() - - - @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) - @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) - @patch('FtpCsvReceiver.os.stat') - @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') - @patch('FtpCsvReceiver.os.remove') - @patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing - def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): - mock_os_stat.return_value.st_size = 100 # Non-empty file - test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' - - # Setup mock return values for extract_value - mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX'] - - self.handler.on_file_received(test_file_path) - - # Verify file stats checked - mock_os_stat.assert_called_once_with(test_file_path) - # Verify file opened - mock_file_open.assert_called_once_with(test_file_path, 'r') - # Verify path splitting - mock_split.assert_called_once_with(test_file_path) - mock_splitext.assert_called_once_with('ID1234_data.CSV') - # Verify extract_value calls - expected_extract_calls = [ - call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string - call(self.mock_cfg.units_type, 'ID1234_data', ANY), - call(self.mock_cfg.tools_name, 'ID1234_data', ANY), - call(self.mock_cfg.tools_type, 'ID1234_data', ANY), - ] - mock_extract.assert_has_calls(expected_extract_calls) - # Verify DB connection - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - # Verify DB insert - expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)" - expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3') - self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data) - self.mock_db_conn.commit.assert_called_once() - self.mock_db_conn.close.assert_called_once() - # Verify file removed - mock_os_remove.assert_called_once_with(test_file_path) - # Verify logging - self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.') - - @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT')) - @patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT')) - @patch('FtpCsvReceiver.os.stat') - @patch('FtpCsvReceiver.os.remove') - def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split): - mock_os_stat.return_value.st_size = 100 - test_file_path = '/fake/ftp/root/user1/data.WRONGEXT' - - self.handler.on_file_received(test_file_path) - - # Verify only stat, split, and splitext were called - mock_os_stat.assert_called_once_with(test_file_path) - mock_split.assert_called_once_with(test_file_path) - mock_splitext.assert_called_once_with('data.WRONGEXT') - # Verify DB, open, remove were NOT called - self.mock_conn_db.assert_not_called() - mock_os_remove.assert_not_called() - self.mock_logging.info.assert_not_called() # No logging in this path - - @patch('FtpCsvReceiver.os.stat') - @patch('FtpCsvReceiver.os.remove') - def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat): - mock_os_stat.return_value.st_size = 0 # Empty file - test_file_path = '/fake/ftp/root/user1/empty.CSV' - - self.handler.on_file_received(test_file_path) - - # Verify stat called - mock_os_stat.assert_called_once_with(test_file_path) - # Verify file removed - mock_os_remove.assert_called_once_with(test_file_path) - # Verify logging - self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.') - # Verify DB not called - self.mock_conn_db.assert_not_called() - - @patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV')) - @patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV')) - @patch('FtpCsvReceiver.os.stat') - @patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3') - @patch('FtpCsvReceiver.os.remove') - @patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX']) - def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split): - mock_os_stat.return_value.st_size = 100 - test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV' - db_error = Exception("DB connection failed") - self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error - - self.handler.on_file_received(test_file_path) - - # Verify DB interaction attempted - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - self.mock_db_cursor.execute.assert_called_once() - # Verify commit/close not called after error - self.mock_db_conn.commit.assert_not_called() - self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't. - # Verify file was NOT removed - mock_os_remove.assert_not_called() - # Verify error logging - self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.') - self.mock_logging.error.assert_any_call(f'{db_error}') - - @patch('FtpCsvReceiver.os.remove') - def test_on_incomplete_file_received(self, mock_os_remove): - test_file_path = '/fake/ftp/root/user1/incomplete.part' - self.handler.on_incomplete_file_received(test_file_path) - mock_os_remove.assert_called_once_with(test_file_path) - - @patch('FtpCsvReceiver.Path') - @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') - def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor): - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - password = 'newpassword' - expected_hash = sha256(password.encode("UTF-8")).hexdigest() - expected_home = self.mock_cfg.virtpath + 'newuser' - - self.handler.ftp_SITE_ADDU(f'newuser {password}') - - # Verify path creation - mock_path_constructor.assert_called_once_with(expected_home) - mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) - # Verify authorizer call - self.handler.authorizer.add_user.assert_called_once_with( - 'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here - ) - # Verify DB interaction - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')" - self.mock_db_cursor.execute.assert_called_once_with(expected_sql) - self.mock_db_conn.commit.assert_called_once() - self.mock_db_conn.close.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with('200 SITE ADDU successful.') - # Verify logging - self.mock_logging.info.assert_called_with('User newuser created.') - - def test_ftp_SITE_ADDU_missing_args(self): - self.handler.ftp_SITE_ADDU('newuser') # Missing password - self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments') - self.handler.authorizer.add_user.assert_not_called() - self.mock_conn_db.assert_not_called() - - @patch('FtpCsvReceiver.Path') - @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') - def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor): - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - error = OSError("Cannot create dir") - mock_path_instance.mkdir.side_effect = error - - self.handler.ftp_SITE_ADDU('newuser newpassword') - - self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}') - self.handler.authorizer.add_user.assert_not_called() - self.mock_conn_db.assert_not_called() - - @patch('FtpCsvReceiver.Path') - @patch('FtpCsvReceiver.os.path.basename', return_value='newuser') - def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor): - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - error = Exception("DB insert failed") - self.mock_db_cursor.execute.side_effect = error - - self.handler.ftp_SITE_ADDU('newuser newpassword') - - # Verify mkdir called - mock_path_instance.mkdir.assert_called_once() - # Verify authorizer called (happens before DB) - self.handler.authorizer.add_user.assert_called_once() - # Verify DB interaction attempted - self.mock_conn_db.assert_called_once() - self.mock_db_cursor.execute.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.') - - - @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') - def test_ftp_SITE_DELU_success(self, mock_basename): - self.handler.ftp_SITE_DELU('olduser') - - # Verify authorizer call - self.handler.authorizer.remove_user.assert_called_once_with('olduser') - # Verify DB interaction - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'" - self.mock_db_cursor.execute.assert_called_once_with(expected_sql) - self.mock_db_conn.commit.assert_called_once() - self.mock_db_conn.close.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with('200 SITE DELU successful.') - # Verify logging - self.mock_logging.info.assert_called_with('User olduser deleted.') - - @patch('FtpCsvReceiver.os.path.basename', return_value='olduser') - def test_ftp_SITE_DELU_error(self, mock_basename): - error = Exception("DB update failed") - self.mock_db_cursor.execute.side_effect = error - - self.handler.ftp_SITE_DELU('olduser') - - # Verify authorizer call (happens first) - self.handler.authorizer.remove_user.assert_called_once_with('olduser') - # Verify DB interaction attempted - self.mock_conn_db.assert_called_once() - self.mock_db_cursor.execute.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with('501 SITE DELU failed.') - - @patch('FtpCsvReceiver.Path') - @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') - def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor): - mock_path_instance = MagicMock() - mock_path_constructor.return_value = mock_path_instance - user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw') - self.mock_db_cursor.fetchone.return_value = user_data - - self.handler.ftp_SITE_RESU('restoreme') - - # Verify DB interaction - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'" - expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'" - expected_db_calls = [ - call(expected_update_sql), - call(expected_select_sql) - ] - self.mock_db_cursor.execute.assert_has_calls(expected_db_calls) - self.mock_db_conn.commit.assert_called_once() # For the update - self.mock_db_cursor.fetchone.assert_called_once() - # Verify authorizer call - self.handler.authorizer.add_user.assert_called_once_with(*user_data) - # Verify path creation - mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme') - mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True) - # Verify DB close - self.mock_db_conn.close.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with('200 SITE RESU successful.') - # Verify logging - self.mock_logging.info.assert_called_with('User restoreme restored.') - - @patch('FtpCsvReceiver.os.path.basename', return_value='restoreme') - def test_ftp_SITE_RESU_db_error(self, mock_basename): - error = Exception("DB fetch failed") - # Simulate error on the SELECT statement - self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails - - self.handler.ftp_SITE_RESU('restoreme') - - # Verify DB interaction attempted - self.mock_conn_db.assert_called_once() - self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted - self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened - # Verify response - self.handler.respond.assert_called_once_with('501 SITE RESU failed.') - # Verify authorizer not called, mkdir not called - self.handler.authorizer.add_user.assert_not_called() - - - def test_ftp_SITE_LSTU_success(self): - user_list_data = [ - ('userA', 'elr'), - ('userB', 'elmw'), - ] - self.mock_db_cursor.fetchall.return_value = user_list_data - - self.handler.ftp_SITE_LSTU('') # No argument needed - - # Verify DB interaction - self.mock_conn_db.assert_called_once_with(self.mock_cfg) - expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL ' - self.mock_db_cursor.execute.assert_called_once_with(expected_sql) - self.mock_db_cursor.fetchall.assert_called_once() - # Verify push calls - expected_push_calls = [ - call("214-The following virtual users are defined:\r\n"), - call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n') - ] - self.handler.push.assert_has_calls(expected_push_calls) - # Verify final response - self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.") - - def test_ftp_SITE_LSTU_db_error(self): - error = Exception("DB select failed") - self.mock_db_cursor.execute.side_effect = error - - self.handler.ftp_SITE_LSTU('') - - # Verify DB interaction attempted - self.mock_conn_db.assert_called_once() - self.mock_db_cursor.execute.assert_called_once() - # Verify response - self.handler.respond.assert_called_once_with(f'501 list users failed: {error}') - # Verify push not called - self.handler.push.assert_not_called() - - -if __name__ == '__main__': - unittest.main(argv=['first-arg-is-ignored'], exit=False) diff --git a/utils/database/connection.py b/utils/database/connection.py index a48e395..269e36f 100644 --- a/utils/database/connection.py +++ b/utils/database/connection.py @@ -1,9 +1,10 @@ import logging import mysql.connector +from mysql.connector import Error logger = logging.getLogger(__name__) -def nnn_connetti_db(cfg: object) -> object: +def connetti_db(cfg: object) -> object: """ Establishes a connection to a MySQL database. @@ -20,10 +21,14 @@ def nnn_connetti_db(cfg: object) -> object: A MySQL connection object if the connection is successful, otherwise None. """ try: - conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) + conn = mysql.connector.connect(user=cfg.dbuser, + password=cfg.dbpass, + host=cfg.dbhost, + port=cfg.dbport, + database=cfg.dbname) conn.autocommit = True logger.info("Connected") return conn - except mysql.connector.Error as e: + except Error as e: logger.error(f"Database connection error: {e}") raise # Re-raise the exception to be handled by the caller \ No newline at end of file diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index 885c049..16855d0 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -4,14 +4,28 @@ import asyncio logger = logging.getLogger(__name__) -timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] +timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"] -async def load_data(cfg: object, matrice_valori: list, pool) -> bool : +async def load_data(cfg: object, matrice_valori: list, pool) -> bool: + """Carica una lista di record di dati grezzi nel database. + + Esegue un'operazione di inserimento massivo (executemany) per caricare i dati. + Utilizza la clausola 'ON DUPLICATE KEY UPDATE' per aggiornare i record esistenti. + Implementa una logica di re-tentativo in caso di deadlock. + + Args: + cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo. + matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire. + pool: Il pool di connessioni al database. + + Returns: + bool: True se il caricamento ha avuto successo, False altrimenti. + """ if not matrice_valori: logger.info("Nulla da caricare.") return True - sql_insert_RAWDATA = f''' + sql_insert_RAWDATA = f""" INSERT INTO {cfg.dbrawdata} ( `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, @@ -47,7 +61,7 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `Created_at` = NOW() - ''' + """ rc = False async with pool.acquire() as conn: @@ -65,10 +79,12 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : logging.error(f"Error: {e}.") if e.args[0] == 1213: # Deadlock detected - logging.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}") + logging.warning( + f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}" + ) if attempt < cfg.max_retries - 1: - delay = (2 * attempt) + delay = 2 * attempt await asyncio.sleep(delay) continue else: @@ -76,29 +92,64 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool : raise return rc + async def update_status(cfg: object, id: int, status: int, pool) -> None: + """Aggiorna lo stato di un record nella tabella dei record CSV. + + Args: + cfg (object): L'oggetto di configurazione contenente il nome della tabella. + id (int): L'ID del record da aggiornare. + status (int): Il nuovo stato da impostare. + pool: Il pool di connessioni al database. + """ async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute(f'update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}') + await cur.execute( + f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}" + ) await conn.commit() logging.info("Status updated.") except Exception as e: await conn.rollback() - logging.error(f'Error: {e}') + logging.error(f"Error: {e}") + async def unlock(cfg: object, id: int, pool) -> None: + """Sblocca un record nella tabella dei record CSV. + + Imposta il campo 'locked' a 0 per un dato ID. + + Args: + cfg (object): L'oggetto di configurazione contenente il nome della tabella. + id (int): L'ID del record da sbloccare. + pool: Il pool di connessioni al database. + """ async with pool.acquire() as conn: async with conn.cursor() as cur: try: - await cur.execute(f'update {cfg.dbrectable} set locked = 0 where id = {id}') + await cur.execute( + f"update {cfg.dbrectable} set locked = 0 where id = {id}" + ) await conn.commit() logging.info(f"id {id} unlocked.") except Exception as e: await conn.rollback() - logging.error(f'Error: {e}') + logging.error(f"Error: {e}") + async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple: + """Recupera le informazioni per l'esecuzione di un comando Matlab dal database. + + Args: + cfg (object): L'oggetto di configurazione. + unit (str): Il nome dell'unità. + tool (str): Il nome dello strumento. + pool: Il pool di connessioni al database. + + Returns: + tuple: Una tupla contenente le informazioni del comando Matlab, o None in caso di errore. + """ async with pool.acquire() as conn: async with conn.cursor() as cur: try: @@ -110,4 +161,4 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple: where t.name = "{tool}" and u.name = "{unit}"''') return cur.fetchone() except Exception as e: - logging.error(f'Error: {e}') \ No newline at end of file + logging.error(f"Error: {e}") diff --git a/utils/database/matlab_query.py b/utils/database/matlab_query.py index dc0a23d..23086bf 100644 --- a/utils/database/matlab_query.py +++ b/utils/database/matlab_query.py @@ -4,6 +4,21 @@ import aiomysql logger = logging.getLogger(__name__) async def get_matlab_command(cfg: object, tool: str, unit: str, pool) -> tuple: + """Recupera le informazioni per l'esecuzione di un comando Matlab dal database. + + Interroga il database per ottenere i dettagli necessari all'avvio di uno script + Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit). + + Args: + cfg (object): L'oggetto di configurazione. + tool (str): Il nome dello strumento. + unit (str): Il nome dell'unità. + pool: Il pool di connessioni al database. + + Returns: + tuple: Una tupla contenente le informazioni del comando Matlab, + o None se non viene trovato alcun comando. + """ async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: diff --git a/utils/database/nodes_query.py b/utils/database/nodes_query.py index 2732208..e1b9dd4 100644 --- a/utils/database/nodes_query.py +++ b/utils/database/nodes_query.py @@ -5,6 +5,18 @@ import logging logger = logging.getLogger(__name__) async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple: + """Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità. + + Args: + cfg (object): L'oggetto di configurazione. + tool (str): Il nome dello strumento. + unit (str): Il nome dell'unità. + pool: Il pool di connessioni al database. + + Returns: + tuple: Una tupla contenente quattro liste: canali, tipi, ain, din. + Se non vengono trovati risultati, restituisce (None, None, None, None). + """ async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: @@ -32,5 +44,3 @@ async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple: ains.append(row['ain']) dins.append(row['din']) return channels, types, ains, dins - -