add comment

This commit is contained in:
2025-07-11 22:06:45 +02:00
parent 0022d0e326
commit b1ce9061b1
8 changed files with 212 additions and 591 deletions

View File

@@ -6,7 +6,6 @@ import asyncio
import os import os
import aiomysql import aiomysql
import contextvars import contextvars
import subprocess
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
@@ -21,7 +20,16 @@ worker_context = contextvars.ContextVar('worker_id', default='00')
# Formatter personalizzato che include il worker_id # Formatter personalizzato che include il worker_id
class WorkerFormatter(logging.Formatter): class WorkerFormatter(logging.Formatter):
"""Formatter personalizzato che include l'ID del worker nei log."""
def format(self, record): def format(self, record):
"""Formatta il record di log includendo l'ID del worker.
Args:
record: Il record di log da formattare.
Returns:
La stringa formattata del record di log.
"""
record.worker_id = worker_context.get() record.worker_id = worker_context.get()
return super().format(record) return super().format(record)
@@ -34,6 +42,17 @@ ELAB_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool) -> None: async def worker(worker_id: int, cfg: object, pool) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
Il worker preleva un record dal database che indica dati pronti per
l'elaborazione, esegue un comando Matlab associato e attende
prima di iniziare un nuovo ciclo.
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
pool: Il pool di connessioni al database.
"""
# Imposta il context per questo worker # Imposta il context per questo worker
worker_context.set(f"W{worker_id}") worker_context.set(f"W{worker_id}")
@@ -54,14 +73,19 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt' # matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
success = await subprocess.run(matlab_cmd, proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path, cwd=cfg.matlab_func_path,
capture_output=True, stdout=asyncio.subprocess.PIPE,
text=True, stderr=asyncio.subprocess.PIPE
check=True) )
if not success: stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione") logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
logger.info(stdout.decode().strip())
await asyncio.sleep(ELAB_PROCESSING_DELAY) await asyncio.sleep(ELAB_PROCESSING_DELAY)
else: else:
logger.info("Nessun record disponibile") logger.info("Nessun record disponibile")
@@ -73,7 +97,15 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
async def main(): async def main():
"""Main function: avvia i worker e gestisce il ciclo principale.""" """Funzione principale che inizializza e avvia il sistema di elaborazione.
Questa funzione si occupa di:
- Caricare la configurazione.
- Impostare il logging.
- Creare un pool di connessioni al database.
- Avviare i worker concorrenti per l'elaborazione.
- Gestire l'arresto controllato del sistema.
"""
logger.info("Avvio del sistema...") logger.info("Avvio del sistema...")
cfg = setting.Config() cfg = setting.Config()
@@ -106,7 +138,7 @@ async def main():
user=cfg.dbuser, user=cfg.dbuser,
password=cfg.dbpass, password=cfg.dbpass,
db=cfg.dbname, db=cfg.dbname,
minsize=4, minsize=cfg.max_threads,
maxsize=cfg.max_threads*4, maxsize=cfg.max_threads*4,
pool_recycle=3600 pool_recycle=3600
) )

6
env/elab.ini vendored
View File

@@ -5,8 +5,10 @@
max_num = 10 max_num = 10
[matlab] [matlab]
runtime = /usr/local/MATLAB/MATLAB_Runtime/v93 #runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
func_path = /usr/local/matlab_func/ #func_path = /usr/local/matlab_func/
runtime = /home/alex/matlab_sym/
func_path = /home/alex/matlab_sym/
timeout = 1800 timeout = 1800
error = "" error = ""
error_path = /tmp/ error_path = /tmp/

View File

@@ -15,14 +15,26 @@ from utils.database import CSV_RECEIVED
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
# Crea una context variable per identificare il worker # Crea una context variable per identificare il worker
worker_context = contextvars.ContextVar('worker_id', default='00') worker_context = contextvars.ContextVar("worker_id", default="00")
# Formatter personalizzato che include il worker_id # Formatter personalizzato che include il worker_id
class WorkerFormatter(logging.Formatter): class WorkerFormatter(logging.Formatter):
"""Formatter personalizzato per i log che include l'ID del worker."""
def format(self, record): def format(self, record):
"""Formatta il record di log includendo l'ID del worker.
Args:
record: Il record di log da formattare.
Returns:
La stringa formattata del record di log.
"""
record.worker_id = worker_context.get() record.worker_id = worker_context.get()
return super().format(record) return super().format(record)
# Initialize the logger for this module # Initialize the logger for this module
logger = logging.getLogger() logger = logging.getLogger()
@@ -31,11 +43,22 @@ CSV_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare # Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool) -> None: async def worker(worker_id: int, cfg: object, pool) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
Il worker preleva un record CSV dal database, ne elabora il contenuto
e attende prima di iniziare un nuovo ciclo.
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
pool: Il pool di connessioni al database.
"""
# Imposta il context per questo worker # Imposta il context per questo worker
worker_context.set(f"W{worker_id}") worker_context.set(f"W{worker_id}")
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.info("Avviato") logger.info("Avviato")
while True: while True:
@@ -57,18 +80,35 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
await asyncio.sleep(1) await asyncio.sleep(1)
async def load_csv(record: tuple, cfg: object, pool) -> bool: async def load_csv(record: tuple, cfg: object, pool) -> bool:
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) """Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
Args:
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
cfg: L'oggetto di configurazione contenente i parametri del sistema.
pool: Il pool di connessioni al database.
Returns:
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
"""
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.debug("Inizio ricerca nuovo CSV da elaborare") logger.debug("Inizio ricerca nuovo CSV da elaborare")
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] id, unit_type, tool_type, unit_name, tool_name = [
logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}') x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
]
logger.info(
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
)
# Costruisce il nome del modulo da caricare dinamicamente # Costruisce il nome del modulo da caricare dinamicamente
module_names = [f'utils.parsers.by_name.{unit_name}_{tool_name}', module_names = [
f'utils.parsers.by_name.{unit_name}_{tool_type}', f"utils.parsers.by_name.{unit_name}_{tool_name}",
f'utils.parsers.by_name.{unit_name}_all', f"utils.parsers.by_name.{unit_name}_{tool_type}",
f'utils.parsers.by_type.{unit_type}_{tool_type}'] f"utils.parsers.by_name.{unit_name}_all",
f"utils.parsers.by_type.{unit_type}_{tool_type}",
]
modulo = None modulo = None
for module_name in module_names: for module_name in module_names:
try: try:
@@ -77,7 +117,10 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}") logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
break break
except (ImportError, AttributeError) as e: except (ImportError, AttributeError) as e:
logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) logger.debug(
f"Modulo {module_name} non presente o non valido. {e}",
exc_info=debug_mode,
)
if not modulo: if not modulo:
logger.error(f"Nessun modulo trovato {module_names}") logger.error(f"Nessun modulo trovato {module_names}")
@@ -92,8 +135,17 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
logger.info(f"Elaborazione completata per ID={id}") logger.info(f"Elaborazione completata per ID={id}")
return True return True
async def main(): async def main():
"""Main function: avvia i worker e gestisce il ciclo principale.""" """Funzione principale che inizializza e avvia il sistema.
Questa funzione si occupa di:
- Caricare la configurazione.
- Impostare il logging.
- Creare un pool di connessioni al database.
- Avviare i worker concorrenti per l'elaborazione dei CSV.
- Gestire l'arresto controllato del sistema.
"""
logger.info("Avvio del sistema...") logger.info("Avvio del sistema...")
cfg = setting.Config() cfg = setting.Config()
@@ -102,7 +154,7 @@ async def main():
try: try:
# Configura il logging globale # Configura il logging globale
log_level = os.getenv("LOG_LEVEL", "INFO").upper() log_level = os.getenv("LOG_LEVEL", "INFO").upper()
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
# Configura il logging con il formatter personalizzato # Configura il logging con il formatter personalizzato
handler = logging.FileHandler(cfg.logfilename) handler = logging.FileHandler(cfg.logfilename)
@@ -126,15 +178,14 @@ async def main():
user=cfg.dbuser, user=cfg.dbuser,
password=cfg.dbpass, password=cfg.dbpass,
db=cfg.dbname, db=cfg.dbname,
minsize=4, minsize=cfg.max_threads,
maxsize=cfg.max_threads * 4, maxsize=cfg.max_threads * 4,
pool_recycle=3600 pool_recycle=3600,
) )
# Avvia i worker # Avvia i worker
workers = [ workers = [
asyncio.create_task(worker(i, cfg, pool)) asyncio.create_task(worker(i, cfg, pool)) for i in range(cfg.max_threads)
for i in range(cfg.max_threads)
] ]
logger.info("Sistema avviato correttamente. In attesa di nuovi task...") logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
@@ -151,5 +202,6 @@ async def main():
except Exception as e: except Exception as e:
logger.error(f"Errore principale: {e}", exc_info=debug_mode) logger.error(f"Errore principale: {e}", exc_info=debug_mode)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File

@@ -1,546 +0,0 @@
import unittest
import os
import sys
from unittest.mock import patch, MagicMock, mock_open, call, ANY
from hashlib import sha256
from pathlib import Path
from types import SimpleNamespace # Used to create mock config objects
# Add the parent directory to sys.path to allow importing FtpCsvReceiver
# Adjust this path if your test file is located differently
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this
# If it's in the parent directory (like /home/alex/devel/ASE/), use this:
sys.path.insert(0, parent_dir)
# Now import the components to test
# We need to import AFTER modifying sys.path if necessary
# Also, mock dependencies BEFORE importing the module that uses them
# Mock mysql.connector BEFORE importing FtpCsvReceiver
mock_mysql_connector = MagicMock()
sys.modules['mysql.connector'] = mock_mysql_connector
# Mock the custom utils modules as well if they aren't available in the test environment
mock_utils_time = MagicMock()
mock_utils_config = MagicMock()
sys.modules['utils.time'] = mock_utils_time
sys.modules['utils.config'] = mock_utils_config
# Mock the setting.config() call specifically
mock_config_instance = MagicMock()
mock_utils_config.set_config.config.return_value = mock_config_instance
# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough
# sys.modules['pyftpdlib.handlers'] = MagicMock()
# sys.modules['pyftpdlib.authorizers'] = MagicMock()
# sys.modules['pyftpdlib.servers'] = MagicMock()
# Import the module AFTER mocking dependencies
import ftp_csv_receiver
from ftp_csv_receiver import (
extract_value,
DummySha256Authorizer,
ASEHandler,
conn_db, # Import even though we mock mysql.connector
)
# --- Test Configuration Setup ---
def create_mock_cfg():
"""Creates a mock configuration object for testing."""
cfg = SimpleNamespace()
cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT']
cfg.dbhost = 'mockhost'
cfg.dbport = 3306
cfg.dbuser = 'mockuser'
cfg.dbpass = 'mockpass'
cfg.dbname = 'mockdb'
cfg.dbusertable = 'mock_virtusers'
cfg.dbrectable = 'mock_received'
cfg.virtpath = '/fake/ftp/root/'
cfg.defperm = 'elmw'
cfg.fileext = ['.CSV', '.TXT']
# Add patterns as lists of strings
cfg.units_name = [r'ID\d{4}', r'IX\d{4}']
cfg.units_type = [r'G801', r'G201']
cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}']
cfg.tools_type = [r'MUX', r'MUMS']
# Add other necessary config values
cfg.logfilename = 'test_ftp.log'
cfg.proxyaddr = '0.0.0.0'
cfg.firstport = 40000
cfg.portrangewidth = 10
return cfg
# --- Test Cases ---
class TestExtractValue(unittest.TestCase):
def test_extract_from_primary(self):
patterns = [r'ID(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Some other text"
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
def test_extract_from_secondary(self):
patterns = [r'Type(A|B)']
primary = "Filename_without_type.txt"
secondary = "Log data: TypeB found"
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
def test_no_match(self):
patterns = [r'XYZ\d+']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound")
def test_case_insensitive(self):
patterns = [r'id(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Some other text"
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case
def test_multiple_patterns(self):
patterns = [r'Type(A|B)', r'ID(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
# Should match the first pattern found in the primary source
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
def test_multiple_patterns_secondary_match(self):
patterns = [r'XYZ\d+', r'Type(A|B)']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
# Should match the second pattern in the secondary source
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
class TestDummySha256Authorizer(unittest.TestCase):
def setUp(self):
self.mock_cfg = create_mock_cfg()
# Mock the database connection and cursor
self.mock_conn = MagicMock()
self.mock_cursor = MagicMock()
mock_mysql_connector.connect.return_value = self.mock_conn
self.mock_conn.cursor.return_value = self.mock_cursor
@patch('FtpCsvReceiver.Path') # Mock Path object
def test_init_loads_users(self, mock_path_constructor):
# Mock Path instance methods
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
# Simulate database result
db_users = [
('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'),
('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'),
]
self.mock_cursor.fetchall.return_value = db_users
authorizer = DummySha256Authorizer(self.mock_cfg)
# Verify DB connection
mock_mysql_connector.connect.assert_called_once_with(
user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass,
host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport
)
# Verify query
self.mock_cursor.execute.assert_called_once_with(
f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL'
)
# Verify admin user added
self.assertIn('admin', authorizer.user_table)
self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1])
# Verify DB users added
self.assertIn('user1', authorizer.user_table)
self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1])
self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2])
self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3])
self.assertIn('user2', authorizer.user_table)
# Verify directories were "created"
expected_path_calls = [
call(self.mock_cfg.virtpath + 'user1'),
call(self.mock_cfg.virtpath + 'user2'),
]
mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True)
self.assertEqual(mock_path_instance.mkdir.call_count, 2)
mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True)
@patch('FtpCsvReceiver.Path')
def test_init_mkdir_exception(self, mock_path_constructor):
# Simulate database result
db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')]
self.mock_cursor.fetchall.return_value = db_users
# Mock Path to raise an exception
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
mock_path_instance.mkdir.side_effect = OSError("Permission denied")
# We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here)
# For a unit test, we just check that the user is still added
authorizer = DummySha256Authorizer(self.mock_cfg)
self.assertIn('user1', authorizer.user_table)
mock_path_instance.mkdir.assert_called_once()
def test_validate_authentication_success(self):
self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity
authorizer = DummySha256Authorizer(self.mock_cfg)
# Test admin user
authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method
def test_validate_authentication_wrong_password(self):
self.mock_cursor.fetchall.return_value = []
authorizer = DummySha256Authorizer(self.mock_cfg)
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
authorizer.validate_authentication('admin', 'wrongpass', None)
def test_validate_authentication_unknown_user(self):
self.mock_cursor.fetchall.return_value = []
authorizer = DummySha256Authorizer(self.mock_cfg)
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
authorizer.validate_authentication('unknown', 'somepass', None)
class TestASEHandler(unittest.TestCase):
def setUp(self):
self.mock_cfg = create_mock_cfg()
self.mock_conn = MagicMock() # Mock FTP connection object
self.mock_server = MagicMock() # Mock FTP server object
self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer
# Instantiate the handler
# We need to manually set cfg and authorizer as done in main()
self.handler = ASEHandler(self.mock_conn, self.mock_server)
self.handler.cfg = self.mock_cfg
self.handler.authorizer = self.mock_authorizer
self.handler.respond = MagicMock() # Mock the respond method
self.handler.push = MagicMock() # Mock the push method
# Mock database for handler methods
self.mock_db_conn = MagicMock()
self.mock_db_cursor = MagicMock()
# Patch conn_db globally for this test class
self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn)
self.mock_conn_db = self.patcher_conn_db.start()
self.mock_db_conn.cursor.return_value = self.mock_db_cursor
# Mock logging
self.patcher_logging = patch('FtpCsvReceiver.logging')
self.mock_logging = self.patcher_logging.start()
def tearDown(self):
# Stop the patchers
self.patcher_conn_db.stop()
self.patcher_logging.stop()
# Reset mocks if needed between tests (though setUp does this)
mock_mysql_connector.reset_mock()
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
@patch('FtpCsvReceiver.os.remove')
@patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing
def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100 # Non-empty file
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
# Setup mock return values for extract_value
mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX']
self.handler.on_file_received(test_file_path)
# Verify file stats checked
mock_os_stat.assert_called_once_with(test_file_path)
# Verify file opened
mock_file_open.assert_called_once_with(test_file_path, 'r')
# Verify path splitting
mock_split.assert_called_once_with(test_file_path)
mock_splitext.assert_called_once_with('ID1234_data.CSV')
# Verify extract_value calls
expected_extract_calls = [
call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string
call(self.mock_cfg.units_type, 'ID1234_data', ANY),
call(self.mock_cfg.tools_name, 'ID1234_data', ANY),
call(self.mock_cfg.tools_type, 'ID1234_data', ANY),
]
mock_extract.assert_has_calls(expected_extract_calls)
# Verify DB connection
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
# Verify DB insert
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)"
expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3')
self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify file removed
mock_os_remove.assert_called_once_with(test_file_path)
# Verify logging
self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.')
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.os.remove')
def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100
test_file_path = '/fake/ftp/root/user1/data.WRONGEXT'
self.handler.on_file_received(test_file_path)
# Verify only stat, split, and splitext were called
mock_os_stat.assert_called_once_with(test_file_path)
mock_split.assert_called_once_with(test_file_path)
mock_splitext.assert_called_once_with('data.WRONGEXT')
# Verify DB, open, remove were NOT called
self.mock_conn_db.assert_not_called()
mock_os_remove.assert_not_called()
self.mock_logging.info.assert_not_called() # No logging in this path
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.os.remove')
def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat):
mock_os_stat.return_value.st_size = 0 # Empty file
test_file_path = '/fake/ftp/root/user1/empty.CSV'
self.handler.on_file_received(test_file_path)
# Verify stat called
mock_os_stat.assert_called_once_with(test_file_path)
# Verify file removed
mock_os_remove.assert_called_once_with(test_file_path)
# Verify logging
self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.')
# Verify DB not called
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
@patch('FtpCsvReceiver.os.remove')
@patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX'])
def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
db_error = Exception("DB connection failed")
self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error
self.handler.on_file_received(test_file_path)
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
self.mock_db_cursor.execute.assert_called_once()
# Verify commit/close not called after error
self.mock_db_conn.commit.assert_not_called()
self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't.
# Verify file was NOT removed
mock_os_remove.assert_not_called()
# Verify error logging
self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.')
self.mock_logging.error.assert_any_call(f'{db_error}')
@patch('FtpCsvReceiver.os.remove')
def test_on_incomplete_file_received(self, mock_os_remove):
test_file_path = '/fake/ftp/root/user1/incomplete.part'
self.handler.on_incomplete_file_received(test_file_path)
mock_os_remove.assert_called_once_with(test_file_path)
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
password = 'newpassword'
expected_hash = sha256(password.encode("UTF-8")).hexdigest()
expected_home = self.mock_cfg.virtpath + 'newuser'
self.handler.ftp_SITE_ADDU(f'newuser {password}')
# Verify path creation
mock_path_constructor.assert_called_once_with(expected_home)
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
# Verify authorizer call
self.handler.authorizer.add_user.assert_called_once_with(
'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here
)
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')"
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE ADDU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User newuser created.')
def test_ftp_SITE_ADDU_missing_args(self):
self.handler.ftp_SITE_ADDU('newuser') # Missing password
self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments')
self.handler.authorizer.add_user.assert_not_called()
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
error = OSError("Cannot create dir")
mock_path_instance.mkdir.side_effect = error
self.handler.ftp_SITE_ADDU('newuser newpassword')
self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}')
self.handler.authorizer.add_user.assert_not_called()
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
error = Exception("DB insert failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_ADDU('newuser newpassword')
# Verify mkdir called
mock_path_instance.mkdir.assert_called_once()
# Verify authorizer called (happens before DB)
self.handler.authorizer.add_user.assert_called_once()
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.')
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
def test_ftp_SITE_DELU_success(self, mock_basename):
self.handler.ftp_SITE_DELU('olduser')
# Verify authorizer call
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'"
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE DELU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User olduser deleted.')
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
def test_ftp_SITE_DELU_error(self, mock_basename):
error = Exception("DB update failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_DELU('olduser')
# Verify authorizer call (happens first)
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('501 SITE DELU failed.')
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw')
self.mock_db_cursor.fetchone.return_value = user_data
self.handler.ftp_SITE_RESU('restoreme')
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'"
expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'"
expected_db_calls = [
call(expected_update_sql),
call(expected_select_sql)
]
self.mock_db_cursor.execute.assert_has_calls(expected_db_calls)
self.mock_db_conn.commit.assert_called_once() # For the update
self.mock_db_cursor.fetchone.assert_called_once()
# Verify authorizer call
self.handler.authorizer.add_user.assert_called_once_with(*user_data)
# Verify path creation
mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme')
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
# Verify DB close
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE RESU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User restoreme restored.')
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
def test_ftp_SITE_RESU_db_error(self, mock_basename):
error = Exception("DB fetch failed")
# Simulate error on the SELECT statement
self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails
self.handler.ftp_SITE_RESU('restoreme')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted
self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened
# Verify response
self.handler.respond.assert_called_once_with('501 SITE RESU failed.')
# Verify authorizer not called, mkdir not called
self.handler.authorizer.add_user.assert_not_called()
def test_ftp_SITE_LSTU_success(self):
user_list_data = [
('userA', 'elr'),
('userB', 'elmw'),
]
self.mock_db_cursor.fetchall.return_value = user_list_data
self.handler.ftp_SITE_LSTU('') # No argument needed
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL '
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_cursor.fetchall.assert_called_once()
# Verify push calls
expected_push_calls = [
call("214-The following virtual users are defined:\r\n"),
call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n')
]
self.handler.push.assert_has_calls(expected_push_calls)
# Verify final response
self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.")
def test_ftp_SITE_LSTU_db_error(self):
error = Exception("DB select failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_LSTU('')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with(f'501 list users failed: {error}')
# Verify push not called
self.handler.push.assert_not_called()
if __name__ == '__main__':
unittest.main(argv=['first-arg-is-ignored'], exit=False)

View File

@@ -1,9 +1,10 @@
import logging import logging
import mysql.connector import mysql.connector
from mysql.connector import Error
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def nnn_connetti_db(cfg: object) -> object: def connetti_db(cfg: object) -> object:
""" """
Establishes a connection to a MySQL database. Establishes a connection to a MySQL database.
@@ -20,10 +21,14 @@ def nnn_connetti_db(cfg: object) -> object:
A MySQL connection object if the connection is successful, otherwise None. A MySQL connection object if the connection is successful, otherwise None.
""" """
try: try:
conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) conn = mysql.connector.connect(user=cfg.dbuser,
password=cfg.dbpass,
host=cfg.dbhost,
port=cfg.dbport,
database=cfg.dbname)
conn.autocommit = True conn.autocommit = True
logger.info("Connected") logger.info("Connected")
return conn return conn
except mysql.connector.Error as e: except Error as e:
logger.error(f"Database connection error: {e}") logger.error(f"Database connection error: {e}")
raise # Re-raise the exception to be handled by the caller raise # Re-raise the exception to be handled by the caller

View File

@@ -4,14 +4,28 @@ import asyncio
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"]
async def load_data(cfg: object, matrice_valori: list, pool) -> bool: async def load_data(cfg: object, matrice_valori: list, pool) -> bool:
"""Carica una lista di record di dati grezzi nel database.
Esegue un'operazione di inserimento massivo (executemany) per caricare i dati.
Utilizza la clausola 'ON DUPLICATE KEY UPDATE' per aggiornare i record esistenti.
Implementa una logica di re-tentativo in caso di deadlock.
Args:
cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo.
matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire.
pool: Il pool di connessioni al database.
Returns:
bool: True se il caricamento ha avuto successo, False altrimenti.
"""
if not matrice_valori: if not matrice_valori:
logger.info("Nulla da caricare.") logger.info("Nulla da caricare.")
return True return True
sql_insert_RAWDATA = f''' sql_insert_RAWDATA = f"""
INSERT INTO {cfg.dbrawdata} ( INSERT INTO {cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
@@ -47,7 +61,7 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
`Created_at` = NOW() `Created_at` = NOW()
''' """
rc = False rc = False
async with pool.acquire() as conn: async with pool.acquire() as conn:
@@ -65,10 +79,12 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
logging.error(f"Error: {e}.") logging.error(f"Error: {e}.")
if e.args[0] == 1213: # Deadlock detected if e.args[0] == 1213: # Deadlock detected
logging.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}") logging.warning(
f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}"
)
if attempt < cfg.max_retries - 1: if attempt < cfg.max_retries - 1:
delay = (2 * attempt) delay = 2 * attempt
await asyncio.sleep(delay) await asyncio.sleep(delay)
continue continue
else: else:
@@ -76,29 +92,64 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
raise raise
return rc return rc
async def update_status(cfg: object, id: int, status: int, pool) -> None: async def update_status(cfg: object, id: int, status: int, pool) -> None:
"""Aggiorna lo stato di un record nella tabella dei record CSV.
Args:
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
id (int): L'ID del record da aggiornare.
status (int): Il nuovo stato da impostare.
pool: Il pool di connessioni al database.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute(f'update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}') await cur.execute(
f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}"
)
await conn.commit() await conn.commit()
logging.info("Status updated.") logging.info("Status updated.")
except Exception as e: except Exception as e:
await conn.rollback() await conn.rollback()
logging.error(f'Error: {e}') logging.error(f"Error: {e}")
async def unlock(cfg: object, id: int, pool) -> None: async def unlock(cfg: object, id: int, pool) -> None:
"""Sblocca un record nella tabella dei record CSV.
Imposta il campo 'locked' a 0 per un dato ID.
Args:
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
id (int): L'ID del record da sbloccare.
pool: Il pool di connessioni al database.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute(f'update {cfg.dbrectable} set locked = 0 where id = {id}') await cur.execute(
f"update {cfg.dbrectable} set locked = 0 where id = {id}"
)
await conn.commit() await conn.commit()
logging.info(f"id {id} unlocked.") logging.info(f"id {id} unlocked.")
except Exception as e: except Exception as e:
await conn.rollback() await conn.rollback()
logging.error(f'Error: {e}') logging.error(f"Error: {e}")
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple: async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
Args:
cfg (object): L'oggetto di configurazione.
unit (str): Il nome dell'unità.
tool (str): Il nome dello strumento.
pool: Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente le informazioni del comando Matlab, o None in caso di errore.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
@@ -110,4 +161,4 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
where t.name = "{tool}" and u.name = "{unit}"''') where t.name = "{tool}" and u.name = "{unit}"''')
return cur.fetchone() return cur.fetchone()
except Exception as e: except Exception as e:
logging.error(f'Error: {e}') logging.error(f"Error: {e}")

View File

@@ -4,6 +4,21 @@ import aiomysql
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def get_matlab_command(cfg: object, tool: str, unit: str, pool) -> tuple: async def get_matlab_command(cfg: object, tool: str, unit: str, pool) -> tuple:
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
Interroga il database per ottenere i dettagli necessari all'avvio di uno script
Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit).
Args:
cfg (object): L'oggetto di configurazione.
tool (str): Il nome dello strumento.
unit (str): Il nome dell'unità.
pool: Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente le informazioni del comando Matlab,
o None se non viene trovato alcun comando.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur: async with conn.cursor(aiomysql.DictCursor) as cur:

View File

@@ -5,6 +5,18 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple: async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple:
"""Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità.
Args:
cfg (object): L'oggetto di configurazione.
tool (str): Il nome dello strumento.
unit (str): Il nome dell'unità.
pool: Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente quattro liste: canali, tipi, ain, din.
Se non vengono trovati risultati, restituisce (None, None, None, None).
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur: async with conn.cursor(aiomysql.DictCursor) as cur:
@@ -32,5 +44,3 @@ async def get_nodes_type(cfg: object, tool: str, unit: str, pool) -> tuple:
ains.append(row['ain']) ains.append(row['ain'])
dins.append(row['din']) dins.append(row['din'])
return channels, types, ains, dins return channels, types, ains, dins