Compare commits

..

5 Commits

Author SHA1 Message Date
f003ba68ed estrtto codice duplicato e devinito modulo orchestrator_utils 2025-07-12 18:16:55 +02:00
7edaef3563 add func parm type 2025-07-12 17:33:38 +02:00
b1ce9061b1 add comment 2025-07-11 22:06:45 +02:00
0022d0e326 dict cursor e pool conn 2025-07-06 23:27:13 +02:00
301aa53c72 elab matlab 2025-07-06 21:52:41 +02:00
39 changed files with 3401 additions and 806 deletions

View File

@@ -1,77 +1,82 @@
#!.venv/bin/python #!.venv/bin/python
# Import necessary libraries # Import necessary libraries
import mysql.connector
import logging import logging
import importlib
import time
import asyncio import asyncio
import subprocess
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_ftp_csv as setting from utils.config import loader_matlab_elab as setting
from utils.database.connection import connetti_db
from utils.database.loader_action import get_matlab_cmd
from utils.database import DATA_LOADED from utils.database import DATA_LOADED
from utils.database.matlab_query import get_matlab_command
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
# Initialize the logger for this module # Initialize the logger for this module
logger = logging.getLogger(__name__) logger = logging.getLogger()
# Function to elaborate CSV data # Delay tra un processamento CSV e il successivo (in secondi)
async def run_matlab_elab(id: int, unit_name: str, unit_type: str, tool_name: str, tool_type: str, semaphore: asyncio.Semaphore) -> bool: ELAB_PROCESSING_DELAY = 0.2
async with semaphore: # Tempo di attesa se non ci sono record da elaborare
if get_matlab_cmd(cfg, unit_name, tool_name): NO_RECORD_SLEEP = 60
# If a record is found, lock it by updating the 'locked' field to 1
async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
Il worker preleva un record dal database che indica dati pronti per
l'elaborazione, esegue un comando Matlab associato e attende
prima di iniziare un nuovo ciclo.
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database.
"""
# Imposta il context per questo worker
worker_context.set(f"W{worker_id:02d}")
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.info("Avviato")
while True:
try:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, DATA_LOADED)
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool)
if matlab_info:
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
logger.info(stdout.decode().strip())
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
await asyncio.sleep(1)
async def main(): async def main():
# Load the configuration settings """Funzione principale che avvia l'elab_orchestrator."""
cfg = setting.Config() await run_orchestrator(setting.Config, worker)
try:
# Configure logging to write log messages to a file with a specific format
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
filename=cfg.logfilename,
level=logging.INFO,
)
# Limita il numero di esecuzioni concorrenti a max_threads
semaphore = asyncio.Semaphore(cfg.max_threads)
running_tasks = set()
# Enter an infinite loop to continuously process records
while True:
try:
# Establish a database connection
with connetti_db(cfg) as conn:
cur = conn.cursor()
# Select a single record from the raw data table that is not currently locked and has a status of 0
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {DATA_LOADED} limit 1')
id, unit_name, unit_type, tool_name, tool_type = cur.fetchone()
if id:
task = asyncio.create_task(run_matlab_elab(id, unit_name, unit_type, tool_name, tool_type, semaphore))
running_tasks.add(task)
# Rimuovi i task completati dal set
running_tasks = {t for t in running_tasks if not t.done()}
# If a record was successfully processed, log the number of threads currently running
#logger.info(f"Threads in execution: {len(threads)}")
except Exception as e:
logger.info(f"Error: {e}.")
except KeyboardInterrupt:
# Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program
logger.info("Info: Shutdown requested...exiting")
except Exception as e:
logger.info(f"Error: {e}.")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

16
env/elab.ini vendored
View File

@@ -0,0 +1,16 @@
[logging]
logFilename = ./elab_data.log
[threads]
max_num = 10
[matlab]
#runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
#func_path = /usr/local/matlab_func/
runtime = /home/alex/matlab_sym/
func_path = /home/alex/matlab_sym/
timeout = 1800
error = ""
error_path = /tmp/

View File

@@ -61,13 +61,13 @@ class DummySha256Authorizer(DummyAuthorizer):
class ASEHandler(FTPHandler): class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling.""" """Custom FTP handler that extends FTPHandler with custom commands and file handling."""
def __init__(self: object, conn: object, server: object, ioloop=None) -> None: def __init__(self: object, conn: object, server: object, ioloop:object=None) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions. """Initializes the handler, adds custom commands, and sets up command permissions.
Args: Args:
conn: The connection object. conn (object): The connection object.
server: The FTP server object. server (object): The FTP server object.
ioloop: The I/O loop object. ioloop (object): The I/O loop object.
""" """
super().__init__(conn, server, ioloop) super().__init__(conn, server, ioloop)
self.proto_cmds = FTPHandler.proto_cmds.copy() self.proto_cmds = FTPHandler.proto_cmds.copy()

View File

@@ -4,22 +4,12 @@
import logging import logging
import importlib import importlib
import asyncio import asyncio
import os
import aiomysql
import contextvars
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_load_data as setting from utils.config import loader_load_data as setting
from utils.database import CSV_RECEIVED from utils.database import CSV_RECEIVED
from utils.csv.loaders import get_next_csv_atomic
# Crea una context variable per identificare il worker from utils.orchestrator_utils import run_orchestrator, worker_context
worker_context = contextvars.ContextVar('worker_id', default='00')
# Formatter personalizzato che include il worker_id
class WorkerFormatter(logging.Formatter):
def format(self, record):
record.worker_id = worker_context.get()
return super().format(record)
# Initialize the logger for this module # Initialize the logger for this module
logger = logging.getLogger() logger = logging.getLogger()
@@ -29,53 +19,28 @@ CSV_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare # Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def get_next_csv_atomic(pool, table_name): async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Preleva atomicamente il prossimo CSV da elaborare""" """Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
async with pool.acquire() as conn:
# IMPORTANTE: Disabilita autocommit per questa transazione
await conn.begin()
try: Il worker preleva un record CSV dal database, ne elabora il contenuto
async with conn.cursor() as cur: e attende prima di iniziare un nuovo ciclo.
# Usa SELECT FOR UPDATE per lock atomico
await cur.execute(f"""
SELECT id, unit_type, tool_type, unit_name, tool_name
FROM {table_name}
WHERE locked = 0 AND status = %s
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (CSV_RECEIVED,))
result = await cur.fetchone() Args:
if result: worker_id (int): L'ID univoco del worker.
await cur.execute(f""" cfg (object): L'oggetto di configurazione.
UPDATE {table_name} pool (object): Il pool di connessioni al database.
SET locked = 1 """
WHERE id = %s
""", (result[0],))
# Commit esplicito per rilasciare il lock
await conn.commit()
return result
except Exception as e:
# Rollback in caso di errore
await conn.rollback()
raise e
async def worker(worker_id: int, cfg: object, pool) -> None:
# Imposta il context per questo worker # Imposta il context per questo worker
worker_context.set(f"W{worker_id}") worker_context.set(f"W{worker_id:02d}")
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.info("Avviato") logger.info("Avviato")
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable) record = await get_next_csv_atomic(pool, cfg.dbrectable, CSV_RECEIVED)
if record: if record:
success = await load_csv(record, cfg, pool) success = await load_csv(record, cfg, pool)
@@ -83,25 +48,42 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
logger.error("Errore durante l'elaborazione") logger.error("Errore durante l'elaborazione")
await asyncio.sleep(CSV_PROCESSING_DELAY) await asyncio.sleep(CSV_PROCESSING_DELAY)
else: else:
logger.debug("Nessun record disponibile") logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode) logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
await asyncio.sleep(1) await asyncio.sleep(1)
async def load_csv(record: tuple, cfg: object, pool) -> bool:
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG) async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
"""Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
Args:
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
cfg: L'oggetto di configurazione contenente i parametri del sistema.
pool (object): Il pool di connessioni al database.
Returns:
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
"""
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.debug("Inizio ricerca nuovo CSV da elaborare") logger.debug("Inizio ricerca nuovo CSV da elaborare")
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] id, unit_type, tool_type, unit_name, tool_name = [
logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}') x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
]
logger.info(
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
)
# Costruisce il nome del modulo da caricare dinamicamente # Costruisce il nome del modulo da caricare dinamicamente
module_names = [f'utils.parsers.by_name.{unit_name}_{tool_name}', module_names = [
f'utils.parsers.by_name.{unit_name}_{tool_type}', f"utils.parsers.by_name.{unit_name}_{tool_name}",
f'utils.parsers.by_name.{unit_name}_all', f"utils.parsers.by_name.{unit_name}_{tool_type}",
f'utils.parsers.by_type.{unit_type}_{tool_type}'] f"utils.parsers.by_name.{unit_name}_all",
f"utils.parsers.by_type.{unit_type}_{tool_type}",
]
modulo = None modulo = None
for module_name in module_names: for module_name in module_names:
try: try:
@@ -110,7 +92,10 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}") logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
break break
except (ImportError, AttributeError) as e: except (ImportError, AttributeError) as e:
logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode) logger.debug(
f"Modulo {module_name} non presente o non valido. {e}",
exc_info=debug_mode,
)
if not modulo: if not modulo:
logger.error(f"Nessun modulo trovato {module_names}") logger.error(f"Nessun modulo trovato {module_names}")
@@ -125,64 +110,11 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
logger.info(f"Elaborazione completata per ID={id}") logger.info(f"Elaborazione completata per ID={id}")
return True return True
async def main(): async def main():
"""Main function: avvia i worker e gestisce il ciclo principale.""" """Funzione principale che avvia il load_orchestrator."""
logger.info("Avvio del sistema...") await run_orchestrator(setting.Config, worker)
cfg = setting.Config()
logger.info("Configurazione caricata correttamente")
try:
# Configura il logging globale
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
# Configura il logging con il formatter personalizzato
handler = logging.FileHandler(cfg.logfilename)
formatter = WorkerFormatter(
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
)
handler.setFormatter(formatter)
# Rimuovi eventuali handler esistenti e aggiungi il nostro
logger.handlers.clear()
logger.addHandler(handler)
logger.setLevel(getattr(logging, log_level))
logger.info("Logging configurato correttamente")
# Numero massimo di worker concorrenti
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
pool = await aiomysql.create_pool(
host=cfg.dbhost,
user=cfg.dbuser,
password=cfg.dbpass,
db=cfg.dbname,
minsize=4,
maxsize=cfg.max_threads*4,
pool_recycle=3600
)
# Avvia i worker
workers = [
asyncio.create_task(worker(i, cfg, pool))
for i in range(cfg.max_threads)
]
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
try:
await asyncio.gather(*workers, return_exceptions=debug_mode)
finally:
pool.close()
await pool.wait_closed()
except KeyboardInterrupt:
logger.info("Info: Shutdown richiesto... chiusura in corso")
except Exception as e:
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

2584
old_script/TS_PiniScript.py Executable file

File diff suppressed because one or more lines are too long

15
old_script/dbconfig.py Executable file
View File

@@ -0,0 +1,15 @@
from configparser import ConfigParser
def read_db_config(filename='/home/battilo/scripts/config.ini', section='mysql'):
parser = ConfigParser()
parser.read(filename)
db = {}
if parser.has_section(section):
items = parser.items(section)
for item in items:
db[item[0]] = item[1]
else:
raise Exception('{0} not found in the {1} file'.format(section, filename))
return db

View File

@@ -1,546 +0,0 @@
import unittest
import os
import sys
from unittest.mock import patch, MagicMock, mock_open, call, ANY
from hashlib import sha256
from pathlib import Path
from types import SimpleNamespace # Used to create mock config objects
# Add the parent directory to sys.path to allow importing FtpCsvReceiver
# Adjust this path if your test file is located differently
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(script_dir)
# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this
# If it's in the parent directory (like /home/alex/devel/ASE/), use this:
sys.path.insert(0, parent_dir)
# Now import the components to test
# We need to import AFTER modifying sys.path if necessary
# Also, mock dependencies BEFORE importing the module that uses them
# Mock mysql.connector BEFORE importing FtpCsvReceiver
mock_mysql_connector = MagicMock()
sys.modules['mysql.connector'] = mock_mysql_connector
# Mock the custom utils modules as well if they aren't available in the test environment
mock_utils_time = MagicMock()
mock_utils_config = MagicMock()
sys.modules['utils.time'] = mock_utils_time
sys.modules['utils.config'] = mock_utils_config
# Mock the setting.config() call specifically
mock_config_instance = MagicMock()
mock_utils_config.set_config.config.return_value = mock_config_instance
# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough
# sys.modules['pyftpdlib.handlers'] = MagicMock()
# sys.modules['pyftpdlib.authorizers'] = MagicMock()
# sys.modules['pyftpdlib.servers'] = MagicMock()
# Import the module AFTER mocking dependencies
import ftp_csv_receiver
from ftp_csv_receiver import (
extract_value,
DummySha256Authorizer,
ASEHandler,
conn_db, # Import even though we mock mysql.connector
)
# --- Test Configuration Setup ---
def create_mock_cfg():
"""Creates a mock configuration object for testing."""
cfg = SimpleNamespace()
cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT']
cfg.dbhost = 'mockhost'
cfg.dbport = 3306
cfg.dbuser = 'mockuser'
cfg.dbpass = 'mockpass'
cfg.dbname = 'mockdb'
cfg.dbusertable = 'mock_virtusers'
cfg.dbrectable = 'mock_received'
cfg.virtpath = '/fake/ftp/root/'
cfg.defperm = 'elmw'
cfg.fileext = ['.CSV', '.TXT']
# Add patterns as lists of strings
cfg.units_name = [r'ID\d{4}', r'IX\d{4}']
cfg.units_type = [r'G801', r'G201']
cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}']
cfg.tools_type = [r'MUX', r'MUMS']
# Add other necessary config values
cfg.logfilename = 'test_ftp.log'
cfg.proxyaddr = '0.0.0.0'
cfg.firstport = 40000
cfg.portrangewidth = 10
return cfg
# --- Test Cases ---
class TestExtractValue(unittest.TestCase):
def test_extract_from_primary(self):
patterns = [r'ID(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Some other text"
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
def test_extract_from_secondary(self):
patterns = [r'Type(A|B)']
primary = "Filename_without_type.txt"
secondary = "Log data: TypeB found"
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
def test_no_match(self):
patterns = [r'XYZ\d+']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound")
def test_case_insensitive(self):
patterns = [r'id(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Some other text"
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case
def test_multiple_patterns(self):
patterns = [r'Type(A|B)', r'ID(\d+)']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
# Should match the first pattern found in the primary source
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
def test_multiple_patterns_secondary_match(self):
patterns = [r'XYZ\d+', r'Type(A|B)']
primary = "File_ID1234_data.csv"
secondary = "Log data: TypeB found"
# Should match the second pattern in the secondary source
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
class TestDummySha256Authorizer(unittest.TestCase):
def setUp(self):
self.mock_cfg = create_mock_cfg()
# Mock the database connection and cursor
self.mock_conn = MagicMock()
self.mock_cursor = MagicMock()
mock_mysql_connector.connect.return_value = self.mock_conn
self.mock_conn.cursor.return_value = self.mock_cursor
@patch('FtpCsvReceiver.Path') # Mock Path object
def test_init_loads_users(self, mock_path_constructor):
# Mock Path instance methods
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
# Simulate database result
db_users = [
('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'),
('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'),
]
self.mock_cursor.fetchall.return_value = db_users
authorizer = DummySha256Authorizer(self.mock_cfg)
# Verify DB connection
mock_mysql_connector.connect.assert_called_once_with(
user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass,
host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport
)
# Verify query
self.mock_cursor.execute.assert_called_once_with(
f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL'
)
# Verify admin user added
self.assertIn('admin', authorizer.user_table)
self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1])
# Verify DB users added
self.assertIn('user1', authorizer.user_table)
self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1])
self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2])
self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3])
self.assertIn('user2', authorizer.user_table)
# Verify directories were "created"
expected_path_calls = [
call(self.mock_cfg.virtpath + 'user1'),
call(self.mock_cfg.virtpath + 'user2'),
]
mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True)
self.assertEqual(mock_path_instance.mkdir.call_count, 2)
mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True)
@patch('FtpCsvReceiver.Path')
def test_init_mkdir_exception(self, mock_path_constructor):
# Simulate database result
db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')]
self.mock_cursor.fetchall.return_value = db_users
# Mock Path to raise an exception
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
mock_path_instance.mkdir.side_effect = OSError("Permission denied")
# We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here)
# For a unit test, we just check that the user is still added
authorizer = DummySha256Authorizer(self.mock_cfg)
self.assertIn('user1', authorizer.user_table)
mock_path_instance.mkdir.assert_called_once()
def test_validate_authentication_success(self):
self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity
authorizer = DummySha256Authorizer(self.mock_cfg)
# Test admin user
authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method
def test_validate_authentication_wrong_password(self):
self.mock_cursor.fetchall.return_value = []
authorizer = DummySha256Authorizer(self.mock_cfg)
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
authorizer.validate_authentication('admin', 'wrongpass', None)
def test_validate_authentication_unknown_user(self):
self.mock_cursor.fetchall.return_value = []
authorizer = DummySha256Authorizer(self.mock_cfg)
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
authorizer.validate_authentication('unknown', 'somepass', None)
class TestASEHandler(unittest.TestCase):
def setUp(self):
self.mock_cfg = create_mock_cfg()
self.mock_conn = MagicMock() # Mock FTP connection object
self.mock_server = MagicMock() # Mock FTP server object
self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer
# Instantiate the handler
# We need to manually set cfg and authorizer as done in main()
self.handler = ASEHandler(self.mock_conn, self.mock_server)
self.handler.cfg = self.mock_cfg
self.handler.authorizer = self.mock_authorizer
self.handler.respond = MagicMock() # Mock the respond method
self.handler.push = MagicMock() # Mock the push method
# Mock database for handler methods
self.mock_db_conn = MagicMock()
self.mock_db_cursor = MagicMock()
# Patch conn_db globally for this test class
self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn)
self.mock_conn_db = self.patcher_conn_db.start()
self.mock_db_conn.cursor.return_value = self.mock_db_cursor
# Mock logging
self.patcher_logging = patch('FtpCsvReceiver.logging')
self.mock_logging = self.patcher_logging.start()
def tearDown(self):
# Stop the patchers
self.patcher_conn_db.stop()
self.patcher_logging.stop()
# Reset mocks if needed between tests (though setUp does this)
mock_mysql_connector.reset_mock()
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
@patch('FtpCsvReceiver.os.remove')
@patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing
def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100 # Non-empty file
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
# Setup mock return values for extract_value
mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX']
self.handler.on_file_received(test_file_path)
# Verify file stats checked
mock_os_stat.assert_called_once_with(test_file_path)
# Verify file opened
mock_file_open.assert_called_once_with(test_file_path, 'r')
# Verify path splitting
mock_split.assert_called_once_with(test_file_path)
mock_splitext.assert_called_once_with('ID1234_data.CSV')
# Verify extract_value calls
expected_extract_calls = [
call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string
call(self.mock_cfg.units_type, 'ID1234_data', ANY),
call(self.mock_cfg.tools_name, 'ID1234_data', ANY),
call(self.mock_cfg.tools_type, 'ID1234_data', ANY),
]
mock_extract.assert_has_calls(expected_extract_calls)
# Verify DB connection
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
# Verify DB insert
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)"
expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3')
self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify file removed
mock_os_remove.assert_called_once_with(test_file_path)
# Verify logging
self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.')
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.os.remove')
def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100
test_file_path = '/fake/ftp/root/user1/data.WRONGEXT'
self.handler.on_file_received(test_file_path)
# Verify only stat, split, and splitext were called
mock_os_stat.assert_called_once_with(test_file_path)
mock_split.assert_called_once_with(test_file_path)
mock_splitext.assert_called_once_with('data.WRONGEXT')
# Verify DB, open, remove were NOT called
self.mock_conn_db.assert_not_called()
mock_os_remove.assert_not_called()
self.mock_logging.info.assert_not_called() # No logging in this path
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.os.remove')
def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat):
mock_os_stat.return_value.st_size = 0 # Empty file
test_file_path = '/fake/ftp/root/user1/empty.CSV'
self.handler.on_file_received(test_file_path)
# Verify stat called
mock_os_stat.assert_called_once_with(test_file_path)
# Verify file removed
mock_os_remove.assert_called_once_with(test_file_path)
# Verify logging
self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.')
# Verify DB not called
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
@patch('FtpCsvReceiver.os.stat')
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
@patch('FtpCsvReceiver.os.remove')
@patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX'])
def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
mock_os_stat.return_value.st_size = 100
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
db_error = Exception("DB connection failed")
self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error
self.handler.on_file_received(test_file_path)
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
self.mock_db_cursor.execute.assert_called_once()
# Verify commit/close not called after error
self.mock_db_conn.commit.assert_not_called()
self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't.
# Verify file was NOT removed
mock_os_remove.assert_not_called()
# Verify error logging
self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.')
self.mock_logging.error.assert_any_call(f'{db_error}')
@patch('FtpCsvReceiver.os.remove')
def test_on_incomplete_file_received(self, mock_os_remove):
test_file_path = '/fake/ftp/root/user1/incomplete.part'
self.handler.on_incomplete_file_received(test_file_path)
mock_os_remove.assert_called_once_with(test_file_path)
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
password = 'newpassword'
expected_hash = sha256(password.encode("UTF-8")).hexdigest()
expected_home = self.mock_cfg.virtpath + 'newuser'
self.handler.ftp_SITE_ADDU(f'newuser {password}')
# Verify path creation
mock_path_constructor.assert_called_once_with(expected_home)
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
# Verify authorizer call
self.handler.authorizer.add_user.assert_called_once_with(
'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here
)
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')"
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE ADDU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User newuser created.')
def test_ftp_SITE_ADDU_missing_args(self):
self.handler.ftp_SITE_ADDU('newuser') # Missing password
self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments')
self.handler.authorizer.add_user.assert_not_called()
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
error = OSError("Cannot create dir")
mock_path_instance.mkdir.side_effect = error
self.handler.ftp_SITE_ADDU('newuser newpassword')
self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}')
self.handler.authorizer.add_user.assert_not_called()
self.mock_conn_db.assert_not_called()
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
error = Exception("DB insert failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_ADDU('newuser newpassword')
# Verify mkdir called
mock_path_instance.mkdir.assert_called_once()
# Verify authorizer called (happens before DB)
self.handler.authorizer.add_user.assert_called_once()
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.')
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
def test_ftp_SITE_DELU_success(self, mock_basename):
self.handler.ftp_SITE_DELU('olduser')
# Verify authorizer call
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'"
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_conn.commit.assert_called_once()
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE DELU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User olduser deleted.')
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
def test_ftp_SITE_DELU_error(self, mock_basename):
error = Exception("DB update failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_DELU('olduser')
# Verify authorizer call (happens first)
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('501 SITE DELU failed.')
@patch('FtpCsvReceiver.Path')
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor):
mock_path_instance = MagicMock()
mock_path_constructor.return_value = mock_path_instance
user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw')
self.mock_db_cursor.fetchone.return_value = user_data
self.handler.ftp_SITE_RESU('restoreme')
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'"
expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'"
expected_db_calls = [
call(expected_update_sql),
call(expected_select_sql)
]
self.mock_db_cursor.execute.assert_has_calls(expected_db_calls)
self.mock_db_conn.commit.assert_called_once() # For the update
self.mock_db_cursor.fetchone.assert_called_once()
# Verify authorizer call
self.handler.authorizer.add_user.assert_called_once_with(*user_data)
# Verify path creation
mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme')
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
# Verify DB close
self.mock_db_conn.close.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with('200 SITE RESU successful.')
# Verify logging
self.mock_logging.info.assert_called_with('User restoreme restored.')
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
def test_ftp_SITE_RESU_db_error(self, mock_basename):
error = Exception("DB fetch failed")
# Simulate error on the SELECT statement
self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails
self.handler.ftp_SITE_RESU('restoreme')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted
self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened
# Verify response
self.handler.respond.assert_called_once_with('501 SITE RESU failed.')
# Verify authorizer not called, mkdir not called
self.handler.authorizer.add_user.assert_not_called()
def test_ftp_SITE_LSTU_success(self):
user_list_data = [
('userA', 'elr'),
('userB', 'elmw'),
]
self.mock_db_cursor.fetchall.return_value = user_list_data
self.handler.ftp_SITE_LSTU('') # No argument needed
# Verify DB interaction
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL '
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
self.mock_db_cursor.fetchall.assert_called_once()
# Verify push calls
expected_push_calls = [
call("214-The following virtual users are defined:\r\n"),
call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n')
]
self.handler.push.assert_has_calls(expected_push_calls)
# Verify final response
self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.")
def test_ftp_SITE_LSTU_db_error(self):
error = Exception("DB select failed")
self.mock_db_cursor.execute.side_effect = error
self.handler.ftp_SITE_LSTU('')
# Verify DB interaction attempted
self.mock_conn_db.assert_called_once()
self.mock_db_cursor.execute.assert_called_once()
# Verify response
self.handler.respond.assert_called_once_with(f'501 list users failed: {error}')
# Verify push not called
self.handler.push.assert_not_called()
if __name__ == '__main__':
unittest.main(argv=['first-arg-is-ignored'], exit=False)

View File

@@ -29,3 +29,10 @@ class Config:
self.dbrawdata = c.get("tables", "rawTableName") self.dbrawdata = c.get("tables", "rawTableName")
self.dbrawdata = c.get("tables", "rawTableName") self.dbrawdata = c.get("tables", "rawTableName")
self.dbnodes = c.get("tables", "nodesTableName") self.dbnodes = c.get("tables", "nodesTableName")
# Matlab
self.matlab_runtime = c.get("matlab", "runtime")
self.matlab_func_path = c.get("matlab", "func_path")
self.matlab_timeout = c.getint("matlab", "timeout")
self.matlab_error = c.get("matlab", "error")
self.matlab_error_path = c.get("matlab", "error_path")

View File

@@ -8,7 +8,17 @@ from itertools import islice
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def get_data(cfg: object, id: int, pool) -> tuple: async def get_data(cfg: object, id: int, pool: object) -> tuple:
"""
Retrieves unit name, tool name, and tool data for a given record ID from the database.
Args:
cfg (object): Configuration object containing database table name.
id (int): The ID of the record to retrieve.
pool (object): The database connection pool.
Returns:
tuple: A tuple containing unit_name, tool_name, and tool_data.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}') await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
@@ -16,7 +26,17 @@ async def get_data(cfg: object, id: int, pool) -> tuple:
return unit_name, tool_name, tool_data return unit_name, tool_name, tool_data
async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list: async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes pipe-separated data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines() righe = ToolData.splitlines()
matrice_valori = [] matrice_valori = []
@@ -38,7 +58,17 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list:
return matrice_valori return matrice_valori
async def make_ain_din_matrix(cfg: object, id: int, pool) -> list: async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes analog and digital input data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines() righe = ToolData.splitlines()
@@ -62,7 +92,17 @@ async def make_ain_din_matrix(cfg: object, id: int, pool) -> list:
return matrice_valori return matrice_valori
async def make_channels_matrix(cfg: object, id: int, pool) -> list: async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes channel-based data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines() righe = ToolData.splitlines()
@@ -80,7 +120,17 @@ async def make_channels_matrix(cfg: object, id: int, pool) -> list:
return matrice_valori return matrice_valori
async def make_musa_matrix(cfg: object, id: int, pool) -> list: async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes 'Musa' specific data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName) node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines() righe = ToolData.splitlines()
@@ -103,7 +153,17 @@ async def make_musa_matrix(cfg: object, id: int, pool) -> list:
return matrice_valori return matrice_valori
async def make_tlp_matrix(cfg: object, id: int, pool) -> list: async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes 'TLP' specific data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines() righe = ToolData.splitlines()
valori_x_nodo = 2 valori_x_nodo = 2
@@ -120,7 +180,17 @@ async def make_tlp_matrix(cfg: object, id: int, pool) -> list:
async def make_gd_matrix(cfg: object, id: int, pool) -> list: async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes 'GD' specific data from a CSV record into a structured matrix.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record.
pool (object): The database connection pool.
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines() righe = ToolData.splitlines()
matrice_valori = [] matrice_valori = []

View File

@@ -6,7 +6,16 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool, action: str) -> None: async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
"""
Main loader function to process CSV data based on the specified action.
Args:
cfg (object): Configuration object.
id (int): The ID of the CSV record to process.
pool (object): The database connection pool.
action (str): The type of data processing to perform (e.g., "pipe_separator", "analogic_digital").
"""
type_matrix_mapping = { type_matrix_mapping = {
"pipe_separator": make_pipe_sep_matrix, "pipe_separator": make_pipe_sep_matrix,
"analogic_digital": make_ain_din_matrix, "analogic_digital": make_ain_din_matrix,
@@ -27,3 +36,39 @@ async def main_loader(cfg: object, id: int, pool, action: str) -> None:
await unlock(cfg, id, pool) await unlock(cfg, id, pool)
else: else:
logger.warning(f"Action '{action}' non riconosciuta.") logger.warning(f"Action '{action}' non riconosciuta.")
async def get_next_csv_atomic(pool, table_name, status):
"""Preleva atomicamente il prossimo CSV da elaborare"""
async with pool.acquire() as conn:
# IMPORTANTE: Disabilita autocommit per questa transazione
await conn.begin()
try:
async with conn.cursor() as cur:
# Usa SELECT FOR UPDATE per lock atomico
await cur.execute(f"""
SELECT id, unit_type, tool_type, unit_name, tool_name
FROM {table_name}
WHERE locked = 0 AND status = %s
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (status,))
result = await cur.fetchone()
if result:
await cur.execute(f"""
UPDATE {table_name}
SET locked = 1
WHERE id = %s
""", (result[0],))
# Commit esplicito per rilasciare il lock
await conn.commit()
return result
except Exception as e:
# Rollback in caso di errore
await conn.rollback()
raise e

View File

@@ -1,9 +1,7 @@
import re import re
def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str: def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str:
"""Extracts the first match for a list of patterns from the primary source.
Falls back to the secondary source if no match is found.
"""
for source in (primary_source, secondary_source): for source in (primary_source, secondary_source):
for pattern in patterns: for pattern in patterns:
matches = re.findall(pattern, source, re.IGNORECASE) matches = re.findall(pattern, source, re.IGNORECASE)

View File

@@ -1,5 +1,6 @@
import logging import logging
import mysql.connector import mysql.connector
from mysql.connector import Error
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,10 +21,14 @@ def connetti_db(cfg: object) -> object:
A MySQL connection object if the connection is successful, otherwise None. A MySQL connection object if the connection is successful, otherwise None.
""" """
try: try:
conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname) conn = mysql.connector.connect(user=cfg.dbuser,
password=cfg.dbpass,
host=cfg.dbhost,
port=cfg.dbport,
database=cfg.dbname)
conn.autocommit = True conn.autocommit = True
logger.info("Connected") logger.info("Connected")
return conn return conn
except mysql.connector.Error as e: except Error as e:
logger.error(f"Database connection error: {e}") logger.error(f"Database connection error: {e}")
raise # Re-raise the exception to be handled by the caller raise # Re-raise the exception to be handled by the caller

View File

@@ -4,14 +4,28 @@ import asyncio
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at'] timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"]
async def load_data(cfg: object, matrice_valori: list, pool) -> bool : async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
"""Carica una lista di record di dati grezzi nel database.
Esegue un'operazione di inserimento massivo (executemany) per caricare i dati.
Utilizza la clausola 'ON DUPLICATE KEY UPDATE' per aggiornare i record esistenti.
Implementa una logica di re-tentativo in caso di deadlock.
Args:
cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo.
matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire.
pool (object): Il pool di connessioni al database.
Returns:
bool: True se il caricamento ha avuto successo, False altrimenti.
"""
if not matrice_valori: if not matrice_valori:
logger.info("Nulla da caricare.") logger.info("Nulla da caricare.")
return True return True
sql_insert_RAWDATA = f''' sql_insert_RAWDATA = f"""
INSERT INTO {cfg.dbrawdata} ( INSERT INTO {cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
@@ -47,7 +61,7 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
`Created_at` = NOW() `Created_at` = NOW()
''' """
rc = False rc = False
async with pool.acquire() as conn: async with pool.acquire() as conn:
@@ -65,10 +79,12 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
logging.error(f"Error: {e}.") logging.error(f"Error: {e}.")
if e.args[0] == 1213: # Deadlock detected if e.args[0] == 1213: # Deadlock detected
logging.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}") logging.warning(
f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}"
)
if attempt < cfg.max_retries - 1: if attempt < cfg.max_retries - 1:
delay = (2 * attempt) delay = 2 * attempt
await asyncio.sleep(delay) await asyncio.sleep(delay)
continue continue
else: else:
@@ -76,29 +92,64 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
raise raise
return rc return rc
async def update_status(cfg: object, id: int, status: int, pool) -> None:
async def update_status(cfg: object, id: int, status: int, pool: object) -> None:
"""Aggiorna lo stato di un record nella tabella dei record CSV.
Args:
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
id (int): L'ID del record da aggiornare.
status (int): Il nuovo stato da impostare.
pool (object): Il pool di connessioni al database.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute(f'update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}') await cur.execute(
f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}"
)
await conn.commit() await conn.commit()
logging.info("Status updated.") logging.info("Status updated.")
except Exception as e: except Exception as e:
await conn.rollback() await conn.rollback()
logging.error(f'Error: {e}') logging.error(f"Error: {e}")
async def unlock(cfg: object, id: int, pool) -> None:
async def unlock(cfg: object, id: int, pool: object) -> None:
"""Sblocca un record nella tabella dei record CSV.
Imposta il campo 'locked' a 0 per un dato ID.
Args:
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
id (int): L'ID del record da sbloccare.
pool (object): Il pool di connessioni al database.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute(f'update {cfg.dbrectable} set locked = 0 where id = {id}') await cur.execute(
f"update {cfg.dbrectable} set locked = 0 where id = {id}"
)
await conn.commit() await conn.commit()
logging.info(f"id {id} unlocked.") logging.info(f"id {id} unlocked.")
except Exception as e: except Exception as e:
await conn.rollback() await conn.rollback()
logging.error(f'Error: {e}') logging.error(f"Error: {e}")
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tuple:
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
Args:
cfg (object): L'oggetto di configurazione.
unit (str): Il nome dell'unità.
tool (str): Il nome dello strumento.
pool (object): Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente le informazioni del comando Matlab, o None in caso di errore.
"""
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
@@ -110,4 +161,4 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
where t.name = "{tool}" and u.name = "{unit}"''') where t.name = "{tool}" and u.name = "{unit}"''')
return cur.fetchone() return cur.fetchone()
except Exception as e: except Exception as e:
logging.error(f'Error: {e}') logging.error(f"Error: {e}")

View File

@@ -0,0 +1,39 @@
import logging
import aiomysql
logger = logging.getLogger(__name__)
async def get_matlab_command(cfg: object, tool: str, unit: str, pool: object) -> tuple:
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
Interroga il database per ottenere i dettagli necessari all'avvio di uno script
Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit).
Args:
cfg (object): L'oggetto di configurazione.
tool (str): Il nome dello strumento.
unit (str): Il nome dell'unità.
pool (object): Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente le informazioni del comando Matlab,
o None se non viene trovato alcun comando.
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(f"""
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
INNER JOIN tools as t on t.matfunc = m.id
INNER JOIN units as u on u.id = t.unit_id
INNER JOIN statustools as s on t.statustool_id = s.id
where t.name = '{tool}' AND u.name = '{unit}';
""")
result = await cur.fetchone()
if not result:
logger.error(f"{unit} - {tool}: Matlab command not found.")
return None
else:
return result

View File

@@ -1,13 +1,26 @@
from utils.database.connection import connetti_db
import aiomysql
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple: async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tuple:
"""Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità.
with connetti_db(cfg) as conn: Args:
cur = conn.cursor(dictionary=True) cfg (object): L'oggetto di configurazione.
query = f""" tool (str): Il nome dello strumento.
unit (str): Il nome dell'unità.
pool (object): Il pool di connessioni al database.
Returns:
tuple: Una tupla contenente quattro liste: canali, tipi, ain, din.
Se non vengono trovati risultati, restituisce (None, None, None, None).
"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(f"""
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
FROM {cfg.dbname}.{cfg.dbnodes} AS n FROM {cfg.dbname}.{cfg.dbnodes} AS n
INNER JOIN tools AS t ON t.id = n.tool_id INNER JOIN tools AS t ON t.id = n.tool_id
@@ -15,13 +28,10 @@ def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple:
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
ORDER BY n.num; ORDER BY n.num;
""" """)
#logger.info(f"{unit} - {tool}: Executing query: {query}")
cur.execute(query) results = await cur.fetchall()
results = cur.fetchall()
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
cur.close()
conn.close()
if not results: if not results:
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.") logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
@@ -34,4 +44,3 @@ def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple:
ains.append(row['ain']) ains.append(row['ain'])
dins.append(row['din']) dins.append(row['din'])
return channels, types, ains, dins return channels, types, ains, dins

View File

@@ -10,7 +10,11 @@ from utils.database.connection import connetti_db
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def ftp_SITE_ADDU(self: object, line: str) -> None: def ftp_SITE_ADDU(self: object, line: str) -> None:
"""Adds a virtual user, creates their directory, and saves their details to the database. """
Adds a virtual user, creates their directory, and saves their details to the database.
Args:
line (str): A string containing the username and password separated by a space.
""" """
cfg = self.cfg cfg = self.cfg
try: try:
@@ -51,7 +55,12 @@ def ftp_SITE_ADDU(self: object, line: str) -> None:
print(e) print(e)
def ftp_SITE_DISU(self: object, line: str) -> None: def ftp_SITE_DISU(self: object, line: str) -> None:
"""Removes a virtual user from the authorizer and marks them as deleted in the database.""" """
Removes a virtual user from the authorizer and marks them as deleted in the database.
Args:
line (str): A string containing the username to be disabled.
"""
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
user = os.path.basename(parms[0]) # Extract the username user = os.path.basename(parms[0]) # Extract the username
@@ -78,7 +87,12 @@ def ftp_SITE_DISU(self: object, line: str) -> None:
print(e) print(e)
def ftp_SITE_ENAU(self: object, line: str) -> None: def ftp_SITE_ENAU(self: object, line: str) -> None:
"""Restores a virtual user by updating their status in the database and adding them back to the authorizer.""" """
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
Args:
line (str): A string containing the username to be enabled.
"""
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
user = os.path.basename(parms[0]) # Extract the username user = os.path.basename(parms[0]) # Extract the username
@@ -117,7 +131,12 @@ def ftp_SITE_ENAU(self: object, line: str) -> None:
print(e) print(e)
def ftp_SITE_LSTU(self: object, line: str) -> None: def ftp_SITE_LSTU(self: object, line: str) -> None:
"""Lists all virtual users from the database.""" """
Lists all virtual users from the database.
Args:
line (str): An empty string (no arguments needed for this command).
"""
cfg = self.cfg cfg = self.cfg
users_list = [] users_list = []
try: try:

104
utils/orchestrator_utils.py Normal file
View File

@@ -0,0 +1,104 @@
import logging
import asyncio
import os
import aiomysql
import contextvars
from typing import Callable, Coroutine, Any
# Crea una context variable per identificare il worker
worker_context = contextvars.ContextVar("worker_id", default="^-^")
# Formatter personalizzato che include il worker_id
class WorkerFormatter(logging.Formatter):
"""Formatter personalizzato per i log che include l'ID del worker."""
def format(self, record: logging.LogRecord) -> str:
"""Formatta il record di log includendo l'ID del worker.
Args:
record (str): Il record di log da formattare.
Returns:
La stringa formattata del record di log.
"""
record.worker_id = worker_context.get()
return super().format(record)
def setup_logging(log_filename: str, log_level_str: str):
"""Configura il logging globale.
Args:
log_filename (str): Percorso del file di log.
log_level_str (str): Livello di log (es. "INFO", "DEBUG").
"""
logger = logging.getLogger()
handler = logging.FileHandler(log_filename)
formatter = WorkerFormatter(
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
)
handler.setFormatter(formatter)
# Rimuovi eventuali handler esistenti e aggiungi il nostro
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(handler)
log_level = getattr(logging, log_level_str.upper(), logging.INFO)
logger.setLevel(log_level)
logger.info("Logging configurato correttamente")
async def run_orchestrator(
config_class: Any,
worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]],
):
"""Funzione principale che inizializza e avvia un orchestratore.
Args:
config_class: La classe di configurazione da istanziare.
worker_coro: La coroutine del worker da eseguire in parallelo.
"""
logger = logging.getLogger()
logger.info("Avvio del sistema...")
cfg = config_class()
logger.info("Configurazione caricata correttamente")
debug_mode = False
try:
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
setup_logging(cfg.logfilename, log_level)
debug_mode = logger.getEffectiveLevel() == logging.DEBUG
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
pool = await aiomysql.create_pool(
host=cfg.dbhost,
user=cfg.dbuser,
password=cfg.dbpass,
db=cfg.dbname,
minsize=cfg.max_threads,
maxsize=cfg.max_threads * 4,
pool_recycle=3600,
)
tasks = [
asyncio.create_task(worker_coro(i, cfg, pool))
for i in range(cfg.max_threads)
]
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
try:
await asyncio.gather(*tasks, return_exceptions=debug_mode)
finally:
pool.close()
await pool.wait_closed()
except KeyboardInterrupt:
logger.info("Info: Shutdown richiesto... chiusura in corso")
except Exception as e:
logger.error(f"Errore principale: {e}", exc_info=debug_mode)

View File

@@ -1 +1 @@
"""Parser delle centraline""" """Parser delle centraline con le tipologie di unit e tool"""

View File

@@ -1 +1 @@
"""Parser delle centraline""" """Parser delle centraline con nomi di unit e tool"""

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'cr1000x_cr1000x'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'd2w_d2w'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g201_g201'.
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
di elaborazione come "channels".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool,"channels") await channels_main_loader(cfg, id, pool,"channels")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g301_g301'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_iptm'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_loc'.
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
di elaborazione come "analogic_digital".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await analog_dig_main_loader(cfg, id, pool, "analogic_digital") await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_mums'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as musa_main_loader from utils.csv.loaders import main_loader as musa_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_musa'.
Questa funzione è un wrapper per `musa_main_loader` e passa il tipo
di elaborazione come "musa".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await musa_main_loader(cfg, id, pool, "musa") await musa_main_loader(cfg, id, pool, "musa")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_mux'.
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
di elaborazione come "channels".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool, "channels") await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_dsas'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as gd_main_loader from utils.csv.loaders import main_loader as gd_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_gd'.
Questa funzione è un wrapper per `gd_main_loader` e passa il tipo
di elaborazione come "gd".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await gd_main_loader(cfg, id, pool, "gd") await gd_main_loader(cfg, id, pool, "gd")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_loc'.
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
di elaborazione come "analogic_digital".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await analog_dig_main_loader(cfg, id, pool, "analogic_digital") await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_modb'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_mums'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as channels_main_loader from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_mux'.
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
di elaborazione come "channels".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool, "channels") await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as tlp_main_loader from utils.csv.loaders import main_loader as tlp_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'gs1_gs1'.
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
di elaborazione come "tlp".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await tlp_main_loader(cfg, id, pool, "tlp") await tlp_main_loader(cfg, id, pool, "tlp")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'hortus_hortus'.
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
di elaborazione come "pipe_separator".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator") await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,2 +1,35 @@
async def main_loader(cfg: object, id: int, pool) -> None: import subprocess
pass import tempfile
import os
from utils.database.loader_action import DATA_LOADED, update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Eseguire il programma con il file temporaneo
result = await subprocess.run(['python3', 'old_script/TS_PiniScript.py', temp_filename], capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if result.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result.stderr}")
raise Exception(f"Errore nel programma: {result.stderr}")
else:
logger.info(f"Programma TS_PiniScript.py eseguito con successo: {result.stdout}")
await update_status(cfg, id, DATA_LOADED, pool)
await unlock(cfg, id, pool)

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'tlp_loc'.
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
di elaborazione come "analogic_digital".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await analog_dig_main_loader(cfg, id, pool, "analogic_digital") await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,4 +1,15 @@
from utils.csv.loaders import main_loader as tlp_main_loader from utils.csv.loaders import main_loader as tlp_main_loader
async def main_loader(cfg: object, id: int, pool) -> None: async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'tlp_tlp'.
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
di elaborazione come "tlp".
Args:
cfg (object): L'oggetto di configurazione.
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await tlp_main_loader(cfg, id, pool, "tlp") await tlp_main_loader(cfg, id, pool, "tlp")