Compare commits
5 Commits
2c67956505
...
f003ba68ed
| Author | SHA1 | Date | |
|---|---|---|---|
| f003ba68ed | |||
| 7edaef3563 | |||
| b1ce9061b1 | |||
| 0022d0e326 | |||
| 301aa53c72 |
@@ -1,77 +1,82 @@
|
||||
#!.venv/bin/python
|
||||
|
||||
# Import necessary libraries
|
||||
import mysql.connector
|
||||
import logging
|
||||
import importlib
|
||||
import time
|
||||
import asyncio
|
||||
import subprocess
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_ftp_csv as setting
|
||||
from utils.database.connection import connetti_db
|
||||
from utils.database.loader_action import get_matlab_cmd
|
||||
from utils.config import loader_matlab_elab as setting
|
||||
from utils.database import DATA_LOADED
|
||||
from utils.database.matlab_query import get_matlab_command
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger(__name__)
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Function to elaborate CSV data
|
||||
async def run_matlab_elab(id: int, unit_name: str, unit_type: str, tool_name: str, tool_type: str, semaphore: asyncio.Semaphore) -> bool:
|
||||
async with semaphore:
|
||||
if get_matlab_cmd(cfg, unit_name, tool_name):
|
||||
# If a record is found, lock it by updating the 'locked' field to 1
|
||||
# Delay tra un processamento CSV e il successivo (in secondi)
|
||||
ELAB_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
|
||||
|
||||
Il worker preleva un record dal database che indica dati pronti per
|
||||
l'elaborazione, esegue un comando Matlab associato e attende
|
||||
prima di iniziare un nuovo ciclo.
|
||||
|
||||
Args:
|
||||
worker_id (int): L'ID univoco del worker.
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id:02d}")
|
||||
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, DATA_LOADED)
|
||||
|
||||
if record:
|
||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||
matlab_info = await get_matlab_command(cfg, tool_name, unit_name, pool)
|
||||
if matlab_info:
|
||||
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{matlab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name} {tool_name}"
|
||||
|
||||
# matlab_error_filename = f'{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt'
|
||||
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
matlab_cmd,
|
||||
cwd=cfg.matlab_func_path,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
logger.error(stderr.decode().strip())
|
||||
logger.info(stdout.decode().strip())
|
||||
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def main():
|
||||
# Load the configuration settings
|
||||
cfg = setting.Config()
|
||||
|
||||
try:
|
||||
# Configure logging to write log messages to a file with a specific format
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
|
||||
filename=cfg.logfilename,
|
||||
level=logging.INFO,
|
||||
)
|
||||
|
||||
|
||||
# Limita il numero di esecuzioni concorrenti a max_threads
|
||||
semaphore = asyncio.Semaphore(cfg.max_threads)
|
||||
running_tasks = set()
|
||||
|
||||
# Enter an infinite loop to continuously process records
|
||||
while True:
|
||||
try:
|
||||
# Establish a database connection
|
||||
with connetti_db(cfg) as conn:
|
||||
cur = conn.cursor()
|
||||
# Select a single record from the raw data table that is not currently locked and has a status of 0
|
||||
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {DATA_LOADED} limit 1')
|
||||
id, unit_name, unit_type, tool_name, tool_type = cur.fetchone()
|
||||
if id:
|
||||
task = asyncio.create_task(run_matlab_elab(id, unit_name, unit_type, tool_name, tool_type, semaphore))
|
||||
running_tasks.add(task)
|
||||
# Rimuovi i task completati dal set
|
||||
running_tasks = {t for t in running_tasks if not t.done()}
|
||||
|
||||
|
||||
# If a record was successfully processed, log the number of threads currently running
|
||||
#logger.info(f"Threads in execution: {len(threads)}")
|
||||
except Exception as e:
|
||||
logger.info(f"Error: {e}.")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
# Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program
|
||||
logger.info("Info: Shutdown requested...exiting")
|
||||
|
||||
except Exception as e:
|
||||
logger.info(f"Error: {e}.")
|
||||
"""Funzione principale che avvia l'elab_orchestrator."""
|
||||
await run_orchestrator(setting.Config, worker)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
16
env/elab.ini
vendored
16
env/elab.ini
vendored
@@ -0,0 +1,16 @@
|
||||
[logging]
|
||||
logFilename = ./elab_data.log
|
||||
|
||||
[threads]
|
||||
max_num = 10
|
||||
|
||||
[matlab]
|
||||
#runtime = /usr/local/MATLAB/MATLAB_Runtime/v93
|
||||
#func_path = /usr/local/matlab_func/
|
||||
runtime = /home/alex/matlab_sym/
|
||||
func_path = /home/alex/matlab_sym/
|
||||
timeout = 1800
|
||||
error = ""
|
||||
error_path = /tmp/
|
||||
|
||||
|
||||
|
||||
@@ -61,13 +61,13 @@ class DummySha256Authorizer(DummyAuthorizer):
|
||||
class ASEHandler(FTPHandler):
|
||||
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
|
||||
|
||||
def __init__(self: object, conn: object, server: object, ioloop=None) -> None:
|
||||
def __init__(self: object, conn: object, server: object, ioloop:object=None) -> None:
|
||||
"""Initializes the handler, adds custom commands, and sets up command permissions.
|
||||
|
||||
Args:
|
||||
conn: The connection object.
|
||||
server: The FTP server object.
|
||||
ioloop: The I/O loop object.
|
||||
conn (object): The connection object.
|
||||
server (object): The FTP server object.
|
||||
ioloop (object): The I/O loop object.
|
||||
"""
|
||||
super().__init__(conn, server, ioloop)
|
||||
self.proto_cmds = FTPHandler.proto_cmds.copy()
|
||||
|
||||
@@ -4,22 +4,12 @@
|
||||
import logging
|
||||
import importlib
|
||||
import asyncio
|
||||
import os
|
||||
import aiomysql
|
||||
import contextvars
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_load_data as setting
|
||||
from utils.database import CSV_RECEIVED
|
||||
|
||||
# Crea una context variable per identificare il worker
|
||||
worker_context = contextvars.ContextVar('worker_id', default='00')
|
||||
|
||||
# Formatter personalizzato che include il worker_id
|
||||
class WorkerFormatter(logging.Formatter):
|
||||
def format(self, record):
|
||||
record.worker_id = worker_context.get()
|
||||
return super().format(record)
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger()
|
||||
@@ -29,53 +19,28 @@ CSV_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def get_next_csv_atomic(pool, table_name):
|
||||
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
||||
async with pool.acquire() as conn:
|
||||
# IMPORTANTE: Disabilita autocommit per questa transazione
|
||||
await conn.begin()
|
||||
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
|
||||
|
||||
try:
|
||||
async with conn.cursor() as cur:
|
||||
# Usa SELECT FOR UPDATE per lock atomico
|
||||
await cur.execute(f"""
|
||||
SELECT id, unit_type, tool_type, unit_name, tool_name
|
||||
FROM {table_name}
|
||||
WHERE locked = 0 AND status = %s
|
||||
ORDER BY id
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""", (CSV_RECEIVED,))
|
||||
Il worker preleva un record CSV dal database, ne elabora il contenuto
|
||||
e attende prima di iniziare un nuovo ciclo.
|
||||
|
||||
result = await cur.fetchone()
|
||||
if result:
|
||||
await cur.execute(f"""
|
||||
UPDATE {table_name}
|
||||
SET locked = 1
|
||||
WHERE id = %s
|
||||
""", (result[0],))
|
||||
|
||||
# Commit esplicito per rilasciare il lock
|
||||
await conn.commit()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Rollback in caso di errore
|
||||
await conn.rollback()
|
||||
raise e
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool) -> None:
|
||||
Args:
|
||||
worker_id (int): L'ID univoco del worker.
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id}")
|
||||
worker_context.set(f"W{worker_id:02d}")
|
||||
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable)
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, CSV_RECEIVED)
|
||||
|
||||
if record:
|
||||
success = await load_csv(record, cfg, pool)
|
||||
@@ -83,25 +48,42 @@ async def worker(worker_id: int, cfg: object, pool) -> None:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||
else:
|
||||
logger.debug("Nessun record disponibile")
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def load_csv(record: tuple, cfg: object, pool) -> bool:
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
|
||||
async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
|
||||
"""Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
|
||||
|
||||
Args:
|
||||
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
|
||||
cfg: L'oggetto di configurazione contenente i parametri del sistema.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
|
||||
"""
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.debug("Inizio ricerca nuovo CSV da elaborare")
|
||||
|
||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||
logger.info(f'Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}')
|
||||
id, unit_type, tool_type, unit_name, tool_name = [
|
||||
x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
|
||||
]
|
||||
logger.info(
|
||||
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
|
||||
)
|
||||
|
||||
# Costruisce il nome del modulo da caricare dinamicamente
|
||||
module_names = [f'utils.parsers.by_name.{unit_name}_{tool_name}',
|
||||
f'utils.parsers.by_name.{unit_name}_{tool_type}',
|
||||
f'utils.parsers.by_name.{unit_name}_all',
|
||||
f'utils.parsers.by_type.{unit_type}_{tool_type}']
|
||||
module_names = [
|
||||
f"utils.parsers.by_name.{unit_name}_{tool_name}",
|
||||
f"utils.parsers.by_name.{unit_name}_{tool_type}",
|
||||
f"utils.parsers.by_name.{unit_name}_all",
|
||||
f"utils.parsers.by_type.{unit_type}_{tool_type}",
|
||||
]
|
||||
modulo = None
|
||||
for module_name in module_names:
|
||||
try:
|
||||
@@ -110,7 +92,10 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
|
||||
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
|
||||
break
|
||||
except (ImportError, AttributeError) as e:
|
||||
logger.debug(f"Modulo {module_name} non presente o non valido. {e}", exc_info=debug_mode)
|
||||
logger.debug(
|
||||
f"Modulo {module_name} non presente o non valido. {e}",
|
||||
exc_info=debug_mode,
|
||||
)
|
||||
|
||||
if not modulo:
|
||||
logger.error(f"Nessun modulo trovato {module_names}")
|
||||
@@ -125,64 +110,11 @@ async def load_csv(record: tuple, cfg: object, pool) -> bool:
|
||||
logger.info(f"Elaborazione completata per ID={id}")
|
||||
return True
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main function: avvia i worker e gestisce il ciclo principale."""
|
||||
logger.info("Avvio del sistema...")
|
||||
"""Funzione principale che avvia il load_orchestrator."""
|
||||
await run_orchestrator(setting.Config, worker)
|
||||
|
||||
cfg = setting.Config()
|
||||
logger.info("Configurazione caricata correttamente")
|
||||
|
||||
try:
|
||||
# Configura il logging globale
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
debug_mode = (logging.getLogger().getEffectiveLevel() == logging.DEBUG)
|
||||
|
||||
# Configura il logging con il formatter personalizzato
|
||||
handler = logging.FileHandler(cfg.logfilename)
|
||||
formatter = WorkerFormatter(
|
||||
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
# Rimuovi eventuali handler esistenti e aggiungi il nostro
|
||||
logger.handlers.clear()
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(getattr(logging, log_level))
|
||||
|
||||
logger.info("Logging configurato correttamente")
|
||||
|
||||
# Numero massimo di worker concorrenti
|
||||
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
|
||||
|
||||
pool = await aiomysql.create_pool(
|
||||
host=cfg.dbhost,
|
||||
user=cfg.dbuser,
|
||||
password=cfg.dbpass,
|
||||
db=cfg.dbname,
|
||||
minsize=4,
|
||||
maxsize=cfg.max_threads*4,
|
||||
pool_recycle=3600
|
||||
)
|
||||
|
||||
# Avvia i worker
|
||||
workers = [
|
||||
asyncio.create_task(worker(i, cfg, pool))
|
||||
for i in range(cfg.max_threads)
|
||||
]
|
||||
|
||||
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
|
||||
|
||||
try:
|
||||
await asyncio.gather(*workers, return_exceptions=debug_mode)
|
||||
finally:
|
||||
pool.close()
|
||||
await pool.wait_closed()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Info: Shutdown richiesto... chiusura in corso")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
asyncio.run(main())
|
||||
|
||||
2584
old_script/TS_PiniScript.py
Executable file
2584
old_script/TS_PiniScript.py
Executable file
File diff suppressed because one or more lines are too long
15
old_script/dbconfig.py
Executable file
15
old_script/dbconfig.py
Executable file
@@ -0,0 +1,15 @@
|
||||
from configparser import ConfigParser
|
||||
|
||||
def read_db_config(filename='/home/battilo/scripts/config.ini', section='mysql'):
|
||||
parser = ConfigParser()
|
||||
parser.read(filename)
|
||||
|
||||
db = {}
|
||||
if parser.has_section(section):
|
||||
items = parser.items(section)
|
||||
for item in items:
|
||||
db[item[0]] = item[1]
|
||||
else:
|
||||
raise Exception('{0} not found in the {1} file'.format(section, filename))
|
||||
|
||||
return db
|
||||
@@ -1,546 +0,0 @@
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import patch, MagicMock, mock_open, call, ANY
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace # Used to create mock config objects
|
||||
|
||||
# Add the parent directory to sys.path to allow importing FtpCsvReceiver
|
||||
# Adjust this path if your test file is located differently
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
parent_dir = os.path.dirname(script_dir)
|
||||
# If FtpCsvReceiver.py is in the same directory as the test file, you might not need this
|
||||
# If it's in the parent directory (like /home/alex/devel/ASE/), use this:
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
# Now import the components to test
|
||||
# We need to import AFTER modifying sys.path if necessary
|
||||
# Also, mock dependencies BEFORE importing the module that uses them
|
||||
# Mock mysql.connector BEFORE importing FtpCsvReceiver
|
||||
mock_mysql_connector = MagicMock()
|
||||
sys.modules['mysql.connector'] = mock_mysql_connector
|
||||
|
||||
# Mock the custom utils modules as well if they aren't available in the test environment
|
||||
mock_utils_time = MagicMock()
|
||||
mock_utils_config = MagicMock()
|
||||
sys.modules['utils.time'] = mock_utils_time
|
||||
sys.modules['utils.config'] = mock_utils_config
|
||||
# Mock the setting.config() call specifically
|
||||
mock_config_instance = MagicMock()
|
||||
mock_utils_config.set_config.config.return_value = mock_config_instance
|
||||
|
||||
# Mock pyftpdlib classes if needed for specific tests, but often mocking methods is enough
|
||||
# sys.modules['pyftpdlib.handlers'] = MagicMock()
|
||||
# sys.modules['pyftpdlib.authorizers'] = MagicMock()
|
||||
# sys.modules['pyftpdlib.servers'] = MagicMock()
|
||||
|
||||
# Import the module AFTER mocking dependencies
|
||||
import ftp_csv_receiver
|
||||
from ftp_csv_receiver import (
|
||||
extract_value,
|
||||
DummySha256Authorizer,
|
||||
ASEHandler,
|
||||
conn_db, # Import even though we mock mysql.connector
|
||||
)
|
||||
|
||||
# --- Test Configuration Setup ---
|
||||
def create_mock_cfg():
|
||||
"""Creates a mock configuration object for testing."""
|
||||
cfg = SimpleNamespace()
|
||||
cfg.adminuser = ['admin', sha256(b'adminpass').hexdigest(), '/fake/admin/path', 'elradfmwMT']
|
||||
cfg.dbhost = 'mockhost'
|
||||
cfg.dbport = 3306
|
||||
cfg.dbuser = 'mockuser'
|
||||
cfg.dbpass = 'mockpass'
|
||||
cfg.dbname = 'mockdb'
|
||||
cfg.dbusertable = 'mock_virtusers'
|
||||
cfg.dbrectable = 'mock_received'
|
||||
cfg.virtpath = '/fake/ftp/root/'
|
||||
cfg.defperm = 'elmw'
|
||||
cfg.fileext = ['.CSV', '.TXT']
|
||||
# Add patterns as lists of strings
|
||||
cfg.units_name = [r'ID\d{4}', r'IX\d{4}']
|
||||
cfg.units_type = [r'G801', r'G201']
|
||||
cfg.tools_name = [r'LOC\d{4}', r'DT\d{4}']
|
||||
cfg.tools_type = [r'MUX', r'MUMS']
|
||||
# Add other necessary config values
|
||||
cfg.logfilename = 'test_ftp.log'
|
||||
cfg.proxyaddr = '0.0.0.0'
|
||||
cfg.firstport = 40000
|
||||
cfg.portrangewidth = 10
|
||||
return cfg
|
||||
|
||||
# --- Test Cases ---
|
||||
|
||||
class TestExtractValue(unittest.TestCase):
|
||||
|
||||
def test_extract_from_primary(self):
|
||||
patterns = [r'ID(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Some other text"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
|
||||
|
||||
def test_extract_from_secondary(self):
|
||||
patterns = [r'Type(A|B)']
|
||||
primary = "Filename_without_type.txt"
|
||||
secondary = "Log data: TypeB found"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
|
||||
|
||||
def test_no_match(self):
|
||||
patterns = [r'XYZ\d+']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary, default="NotFound"), "NotFound")
|
||||
|
||||
def test_case_insensitive(self):
|
||||
patterns = [r'id(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Some other text"
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234") # Note: re.findall captures original case
|
||||
|
||||
def test_multiple_patterns(self):
|
||||
patterns = [r'Type(A|B)', r'ID(\d+)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
# Should match the first pattern found in the primary source
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "ID1234")
|
||||
|
||||
def test_multiple_patterns_secondary_match(self):
|
||||
patterns = [r'XYZ\d+', r'Type(A|B)']
|
||||
primary = "File_ID1234_data.csv"
|
||||
secondary = "Log data: TypeB found"
|
||||
# Should match the second pattern in the secondary source
|
||||
self.assertEqual(extract_value(patterns, primary, secondary), "TypeB")
|
||||
|
||||
|
||||
class TestDummySha256Authorizer(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_cfg = create_mock_cfg()
|
||||
# Mock the database connection and cursor
|
||||
self.mock_conn = MagicMock()
|
||||
self.mock_cursor = MagicMock()
|
||||
mock_mysql_connector.connect.return_value = self.mock_conn
|
||||
self.mock_conn.cursor.return_value = self.mock_cursor
|
||||
|
||||
@patch('FtpCsvReceiver.Path') # Mock Path object
|
||||
def test_init_loads_users(self, mock_path_constructor):
|
||||
# Mock Path instance methods
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
|
||||
# Simulate database result
|
||||
db_users = [
|
||||
('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr'),
|
||||
('user2', sha256(b'pass2').hexdigest(), '/fake/ftp/root/user2', 'elmw'),
|
||||
]
|
||||
self.mock_cursor.fetchall.return_value = db_users
|
||||
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
|
||||
# Verify DB connection
|
||||
mock_mysql_connector.connect.assert_called_once_with(
|
||||
user=self.mock_cfg.dbuser, password=self.mock_cfg.dbpass,
|
||||
host=self.mock_cfg.dbhost, port=self.mock_cfg.dbport
|
||||
)
|
||||
# Verify query
|
||||
self.mock_cursor.execute.assert_called_once_with(
|
||||
f'SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL'
|
||||
)
|
||||
# Verify admin user added
|
||||
self.assertIn('admin', authorizer.user_table)
|
||||
self.assertEqual(authorizer.user_table['admin']['pwd'], self.mock_cfg.adminuser[1])
|
||||
# Verify DB users added
|
||||
self.assertIn('user1', authorizer.user_table)
|
||||
self.assertEqual(authorizer.user_table['user1']['pwd'], db_users[0][1])
|
||||
self.assertEqual(authorizer.user_table['user1']['home'], db_users[0][2])
|
||||
self.assertEqual(authorizer.user_table['user1']['perm'], db_users[0][3])
|
||||
self.assertIn('user2', authorizer.user_table)
|
||||
# Verify directories were "created"
|
||||
expected_path_calls = [
|
||||
call(self.mock_cfg.virtpath + 'user1'),
|
||||
call(self.mock_cfg.virtpath + 'user2'),
|
||||
]
|
||||
mock_path_constructor.assert_has_calls(expected_path_calls, any_order=True)
|
||||
self.assertEqual(mock_path_instance.mkdir.call_count, 2)
|
||||
mock_path_instance.mkdir.assert_called_with(parents=True, exist_ok=True)
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
def test_init_mkdir_exception(self, mock_path_constructor):
|
||||
# Simulate database result
|
||||
db_users = [('user1', sha256(b'pass1').hexdigest(), '/fake/ftp/root/user1', 'elr')]
|
||||
self.mock_cursor.fetchall.return_value = db_users
|
||||
|
||||
# Mock Path to raise an exception
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
mock_path_instance.mkdir.side_effect = OSError("Permission denied")
|
||||
|
||||
# We expect initialization to continue, but maybe log an error (though the code uses self.responde which isn't available here)
|
||||
# For a unit test, we just check that the user is still added
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
self.assertIn('user1', authorizer.user_table)
|
||||
mock_path_instance.mkdir.assert_called_once()
|
||||
|
||||
|
||||
def test_validate_authentication_success(self):
|
||||
self.mock_cursor.fetchall.return_value = [] # No DB users for simplicity
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
# Test admin user
|
||||
authorizer.validate_authentication('admin', 'adminpass', None) # Handler not used in this method
|
||||
|
||||
def test_validate_authentication_wrong_password(self):
|
||||
self.mock_cursor.fetchall.return_value = []
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
|
||||
authorizer.validate_authentication('admin', 'wrongpass', None)
|
||||
|
||||
def test_validate_authentication_unknown_user(self):
|
||||
self.mock_cursor.fetchall.return_value = []
|
||||
authorizer = DummySha256Authorizer(self.mock_cfg)
|
||||
with self.assertRaises(ftp_csv_receiver.AuthenticationFailed):
|
||||
authorizer.validate_authentication('unknown', 'somepass', None)
|
||||
|
||||
|
||||
class TestASEHandler(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mock_cfg = create_mock_cfg()
|
||||
self.mock_conn = MagicMock() # Mock FTP connection object
|
||||
self.mock_server = MagicMock() # Mock FTP server object
|
||||
self.mock_authorizer = MagicMock(spec=DummySha256Authorizer) # Mock authorizer
|
||||
|
||||
# Instantiate the handler
|
||||
# We need to manually set cfg and authorizer as done in main()
|
||||
self.handler = ASEHandler(self.mock_conn, self.mock_server)
|
||||
self.handler.cfg = self.mock_cfg
|
||||
self.handler.authorizer = self.mock_authorizer
|
||||
self.handler.respond = MagicMock() # Mock the respond method
|
||||
self.handler.push = MagicMock() # Mock the push method
|
||||
|
||||
# Mock database for handler methods
|
||||
self.mock_db_conn = MagicMock()
|
||||
self.mock_db_cursor = MagicMock()
|
||||
# Patch conn_db globally for this test class
|
||||
self.patcher_conn_db = patch('FtpCsvReceiver.conn_db', return_value=self.mock_db_conn)
|
||||
self.mock_conn_db = self.patcher_conn_db.start()
|
||||
self.mock_db_conn.cursor.return_value = self.mock_db_cursor
|
||||
|
||||
# Mock logging
|
||||
self.patcher_logging = patch('FtpCsvReceiver.logging')
|
||||
self.mock_logging = self.patcher_logging.start()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# Stop the patchers
|
||||
self.patcher_conn_db.stop()
|
||||
self.patcher_logging.stop()
|
||||
# Reset mocks if needed between tests (though setUp does this)
|
||||
mock_mysql_connector.reset_mock()
|
||||
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
@patch('FtpCsvReceiver.extract_value') # Mock extract_value for focused testing
|
||||
def test_on_file_received_success(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100 # Non-empty file
|
||||
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
|
||||
|
||||
# Setup mock return values for extract_value
|
||||
mock_extract.side_effect = ['ID1234', 'G801', 'LOC5678', 'MUX']
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify file stats checked
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
# Verify file opened
|
||||
mock_file_open.assert_called_once_with(test_file_path, 'r')
|
||||
# Verify path splitting
|
||||
mock_split.assert_called_once_with(test_file_path)
|
||||
mock_splitext.assert_called_once_with('ID1234_data.CSV')
|
||||
# Verify extract_value calls
|
||||
expected_extract_calls = [
|
||||
call(self.mock_cfg.units_name, 'ID1234_data', ANY), # ANY for the lines string
|
||||
call(self.mock_cfg.units_type, 'ID1234_data', ANY),
|
||||
call(self.mock_cfg.tools_name, 'ID1234_data', ANY),
|
||||
call(self.mock_cfg.tools_type, 'ID1234_data', ANY),
|
||||
]
|
||||
mock_extract.assert_has_calls(expected_extract_calls)
|
||||
# Verify DB connection
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
# Verify DB insert
|
||||
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)"
|
||||
expected_data = ('ID1234_data', 'ID1234', 'G801', 'LOC5678', 'MUX', 'G801,col2,col3\nval1,val2,val3')
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql, expected_data)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify file removed
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with(f'File {test_file_path} loaded: removed.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'data.WRONGEXT'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('data', '.WRONGEXT'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_file_received_wrong_extension(self, mock_os_remove, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100
|
||||
test_file_path = '/fake/ftp/root/user1/data.WRONGEXT'
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify only stat, split, and splitext were called
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
mock_split.assert_called_once_with(test_file_path)
|
||||
mock_splitext.assert_called_once_with('data.WRONGEXT')
|
||||
# Verify DB, open, remove were NOT called
|
||||
self.mock_conn_db.assert_not_called()
|
||||
mock_os_remove.assert_not_called()
|
||||
self.mock_logging.info.assert_not_called() # No logging in this path
|
||||
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_file_received_empty_file(self, mock_os_remove, mock_os_stat):
|
||||
mock_os_stat.return_value.st_size = 0 # Empty file
|
||||
test_file_path = '/fake/ftp/root/user1/empty.CSV'
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify stat called
|
||||
mock_os_stat.assert_called_once_with(test_file_path)
|
||||
# Verify file removed
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with(f'File {test_file_path} was empty: removed.')
|
||||
# Verify DB not called
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.split', return_value=('/fake/ftp/root/user1', 'ID1234_data.CSV'))
|
||||
@patch('FtpCsvReceiver.os.path.splitext', return_value=('ID1234_data', '.CSV'))
|
||||
@patch('FtpCsvReceiver.os.stat')
|
||||
@patch('FtpCsvReceiver.open', new_callable=mock_open, read_data='G801,col2,col3\nval1,val2,val3')
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
@patch('FtpCsvReceiver.extract_value', side_effect=['ID1234', 'G801', 'LOC5678', 'MUX'])
|
||||
def test_on_file_received_db_error(self, mock_extract, mock_os_remove, mock_file_open, mock_os_stat, mock_splitext, mock_split):
|
||||
mock_os_stat.return_value.st_size = 100
|
||||
test_file_path = '/fake/ftp/root/user1/ID1234_data.CSV'
|
||||
db_error = Exception("DB connection failed")
|
||||
self.mock_db_cursor.execute.side_effect = db_error # Simulate DB error
|
||||
|
||||
self.handler.on_file_received(test_file_path)
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify commit/close not called after error
|
||||
self.mock_db_conn.commit.assert_not_called()
|
||||
self.mock_db_conn.close.assert_not_called() # Should close be called in finally? Original code doesn't.
|
||||
# Verify file was NOT removed
|
||||
mock_os_remove.assert_not_called()
|
||||
# Verify error logging
|
||||
self.mock_logging.error.assert_any_call(f'File {test_file_path} not loaded. Held in user path.')
|
||||
self.mock_logging.error.assert_any_call(f'{db_error}')
|
||||
|
||||
@patch('FtpCsvReceiver.os.remove')
|
||||
def test_on_incomplete_file_received(self, mock_os_remove):
|
||||
test_file_path = '/fake/ftp/root/user1/incomplete.part'
|
||||
self.handler.on_incomplete_file_received(test_file_path)
|
||||
mock_os_remove.assert_called_once_with(test_file_path)
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_success(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
password = 'newpassword'
|
||||
expected_hash = sha256(password.encode("UTF-8")).hexdigest()
|
||||
expected_home = self.mock_cfg.virtpath + 'newuser'
|
||||
|
||||
self.handler.ftp_SITE_ADDU(f'newuser {password}')
|
||||
|
||||
# Verify path creation
|
||||
mock_path_constructor.assert_called_once_with(expected_home)
|
||||
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.add_user.assert_called_once_with(
|
||||
'newuser', expected_hash, expected_home + '/', perm=self.mock_cfg.defperm # Note: Original code adds trailing slash here
|
||||
)
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f"INSERT INTO {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('newuser', '{expected_hash}', '{expected_home}', '{self.mock_cfg.defperm}')"
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE ADDU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User newuser created.')
|
||||
|
||||
def test_ftp_SITE_ADDU_missing_args(self):
|
||||
self.handler.ftp_SITE_ADDU('newuser') # Missing password
|
||||
self.handler.respond.assert_called_once_with('501 SITE ADDU failed. Command needs 2 arguments')
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_mkdir_error(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
error = OSError("Cannot create dir")
|
||||
mock_path_instance.mkdir.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_ADDU('newuser newpassword')
|
||||
|
||||
self.handler.respond.assert_called_once_with(f'551 Error in create virtual user path: {error}')
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
self.mock_conn_db.assert_not_called()
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='newuser')
|
||||
def test_ftp_SITE_ADDU_db_error(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
error = Exception("DB insert failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_ADDU('newuser newpassword')
|
||||
|
||||
# Verify mkdir called
|
||||
mock_path_instance.mkdir.assert_called_once()
|
||||
# Verify authorizer called (happens before DB)
|
||||
self.handler.authorizer.add_user.assert_called_once()
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with(f'501 SITE ADDU failed: {error}.')
|
||||
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
|
||||
def test_ftp_SITE_DELU_success(self, mock_basename):
|
||||
self.handler.ftp_SITE_DELU('olduser')
|
||||
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = now() WHERE ftpuser = 'olduser'"
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_conn.commit.assert_called_once()
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE DELU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User olduser deleted.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='olduser')
|
||||
def test_ftp_SITE_DELU_error(self, mock_basename):
|
||||
error = Exception("DB update failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_DELU('olduser')
|
||||
|
||||
# Verify authorizer call (happens first)
|
||||
self.handler.authorizer.remove_user.assert_called_once_with('olduser')
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('501 SITE DELU failed.')
|
||||
|
||||
@patch('FtpCsvReceiver.Path')
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
|
||||
def test_ftp_SITE_RESU_success(self, mock_basename, mock_path_constructor):
|
||||
mock_path_instance = MagicMock()
|
||||
mock_path_constructor.return_value = mock_path_instance
|
||||
user_data = ('restoreme', 'somehash', '/fake/ftp/root/restoreme', 'elmw')
|
||||
self.mock_db_cursor.fetchone.return_value = user_data
|
||||
|
||||
self.handler.ftp_SITE_RESU('restoreme')
|
||||
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_update_sql = f"UPDATE {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} SET deleted_at = null WHERE ftpuser = 'restoreme'"
|
||||
expected_select_sql = f"SELECT ftpuser, hash, virtpath, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE ftpuser = 'restoreme'"
|
||||
expected_db_calls = [
|
||||
call(expected_update_sql),
|
||||
call(expected_select_sql)
|
||||
]
|
||||
self.mock_db_cursor.execute.assert_has_calls(expected_db_calls)
|
||||
self.mock_db_conn.commit.assert_called_once() # For the update
|
||||
self.mock_db_cursor.fetchone.assert_called_once()
|
||||
# Verify authorizer call
|
||||
self.handler.authorizer.add_user.assert_called_once_with(*user_data)
|
||||
# Verify path creation
|
||||
mock_path_constructor.assert_called_once_with(self.mock_cfg.virtpath + 'restoreme')
|
||||
mock_path_instance.mkdir.assert_called_once_with(parents=True, exist_ok=True)
|
||||
# Verify DB close
|
||||
self.mock_db_conn.close.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('200 SITE RESU successful.')
|
||||
# Verify logging
|
||||
self.mock_logging.info.assert_called_with('User restoreme restored.')
|
||||
|
||||
@patch('FtpCsvReceiver.os.path.basename', return_value='restoreme')
|
||||
def test_ftp_SITE_RESU_db_error(self, mock_basename):
|
||||
error = Exception("DB fetch failed")
|
||||
# Simulate error on the SELECT statement
|
||||
self.mock_db_cursor.execute.side_effect = [None, error] # First call (UPDATE) ok, second (SELECT) fails
|
||||
|
||||
self.handler.ftp_SITE_RESU('restoreme')
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.assertEqual(self.mock_db_cursor.execute.call_count, 2) # Both UPDATE and SELECT attempted
|
||||
self.mock_db_conn.commit.assert_called_once() # Commit for UPDATE happened
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with('501 SITE RESU failed.')
|
||||
# Verify authorizer not called, mkdir not called
|
||||
self.handler.authorizer.add_user.assert_not_called()
|
||||
|
||||
|
||||
def test_ftp_SITE_LSTU_success(self):
|
||||
user_list_data = [
|
||||
('userA', 'elr'),
|
||||
('userB', 'elmw'),
|
||||
]
|
||||
self.mock_db_cursor.fetchall.return_value = user_list_data
|
||||
|
||||
self.handler.ftp_SITE_LSTU('') # No argument needed
|
||||
|
||||
# Verify DB interaction
|
||||
self.mock_conn_db.assert_called_once_with(self.mock_cfg)
|
||||
expected_sql = f'SELECT ftpuser, perm FROM {self.mock_cfg.dbname}.{self.mock_cfg.dbusertable} WHERE deleted_at IS NULL '
|
||||
self.mock_db_cursor.execute.assert_called_once_with(expected_sql)
|
||||
self.mock_db_cursor.fetchall.assert_called_once()
|
||||
# Verify push calls
|
||||
expected_push_calls = [
|
||||
call("214-The following virtual users are defined:\r\n"),
|
||||
call('Username: userA\tPerms: elr\r\nUsername: userB\tPerms: elmw\r\n')
|
||||
]
|
||||
self.handler.push.assert_has_calls(expected_push_calls)
|
||||
# Verify final response
|
||||
self.handler.respond.assert_called_once_with("214 LSTU SITE command successful.")
|
||||
|
||||
def test_ftp_SITE_LSTU_db_error(self):
|
||||
error = Exception("DB select failed")
|
||||
self.mock_db_cursor.execute.side_effect = error
|
||||
|
||||
self.handler.ftp_SITE_LSTU('')
|
||||
|
||||
# Verify DB interaction attempted
|
||||
self.mock_conn_db.assert_called_once()
|
||||
self.mock_db_cursor.execute.assert_called_once()
|
||||
# Verify response
|
||||
self.handler.respond.assert_called_once_with(f'501 list users failed: {error}')
|
||||
# Verify push not called
|
||||
self.handler.push.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main(argv=['first-arg-is-ignored'], exit=False)
|
||||
@@ -29,3 +29,10 @@ class Config:
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbnodes = c.get("tables", "nodesTableName")
|
||||
|
||||
# Matlab
|
||||
self.matlab_runtime = c.get("matlab", "runtime")
|
||||
self.matlab_func_path = c.get("matlab", "func_path")
|
||||
self.matlab_timeout = c.getint("matlab", "timeout")
|
||||
self.matlab_error = c.get("matlab", "error")
|
||||
self.matlab_error_path = c.get("matlab", "error_path")
|
||||
@@ -8,7 +8,17 @@ from itertools import islice
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_data(cfg: object, id: int, pool) -> tuple:
|
||||
async def get_data(cfg: object, id: int, pool: object) -> tuple:
|
||||
"""
|
||||
Retrieves unit name, tool name, and tool data for a given record ID from the database.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object containing database table name.
|
||||
id (int): The ID of the record to retrieve.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
tuple: A tuple containing unit_name, tool_name, and tool_data.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
|
||||
@@ -16,7 +26,17 @@ async def get_data(cfg: object, id: int, pool) -> tuple:
|
||||
|
||||
return unit_name, tool_name, tool_data
|
||||
|
||||
async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes pipe-separated data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
@@ -38,7 +58,17 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool) -> list:
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_ain_din_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes analog and digital input data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
||||
righe = ToolData.splitlines()
|
||||
@@ -62,7 +92,17 @@ async def make_ain_din_matrix(cfg: object, id: int, pool) -> list:
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_channels_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes channel-based data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
||||
righe = ToolData.splitlines()
|
||||
@@ -80,7 +120,17 @@ async def make_channels_matrix(cfg: object, id: int, pool) -> list:
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_musa_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'Musa' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
|
||||
righe = ToolData.splitlines()
|
||||
@@ -103,7 +153,17 @@ async def make_musa_matrix(cfg: object, id: int, pool) -> list:
|
||||
return matrice_valori
|
||||
|
||||
|
||||
async def make_tlp_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'TLP' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
valori_x_nodo = 2
|
||||
@@ -120,7 +180,17 @@ async def make_tlp_matrix(cfg: object, id: int, pool) -> list:
|
||||
|
||||
|
||||
|
||||
async def make_gd_matrix(cfg: object, id: int, pool) -> list:
|
||||
async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'GD' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
|
||||
@@ -6,7 +6,16 @@ import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool, action: str) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
|
||||
"""
|
||||
Main loader function to process CSV data based on the specified action.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record to process.
|
||||
pool (object): The database connection pool.
|
||||
action (str): The type of data processing to perform (e.g., "pipe_separator", "analogic_digital").
|
||||
"""
|
||||
type_matrix_mapping = {
|
||||
"pipe_separator": make_pipe_sep_matrix,
|
||||
"analogic_digital": make_ain_din_matrix,
|
||||
@@ -27,3 +36,39 @@ async def main_loader(cfg: object, id: int, pool, action: str) -> None:
|
||||
await unlock(cfg, id, pool)
|
||||
else:
|
||||
logger.warning(f"Action '{action}' non riconosciuta.")
|
||||
|
||||
|
||||
async def get_next_csv_atomic(pool, table_name, status):
|
||||
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
||||
async with pool.acquire() as conn:
|
||||
# IMPORTANTE: Disabilita autocommit per questa transazione
|
||||
await conn.begin()
|
||||
|
||||
try:
|
||||
async with conn.cursor() as cur:
|
||||
# Usa SELECT FOR UPDATE per lock atomico
|
||||
await cur.execute(f"""
|
||||
SELECT id, unit_type, tool_type, unit_name, tool_name
|
||||
FROM {table_name}
|
||||
WHERE locked = 0 AND status = %s
|
||||
ORDER BY id
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""", (status,))
|
||||
|
||||
result = await cur.fetchone()
|
||||
if result:
|
||||
await cur.execute(f"""
|
||||
UPDATE {table_name}
|
||||
SET locked = 1
|
||||
WHERE id = %s
|
||||
""", (result[0],))
|
||||
|
||||
# Commit esplicito per rilasciare il lock
|
||||
await conn.commit()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Rollback in caso di errore
|
||||
await conn.rollback()
|
||||
raise e
|
||||
@@ -1,9 +1,7 @@
|
||||
import re
|
||||
|
||||
def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str:
|
||||
"""Extracts the first match for a list of patterns from the primary source.
|
||||
Falls back to the secondary source if no match is found.
|
||||
"""
|
||||
|
||||
for source in (primary_source, secondary_source):
|
||||
for pattern in patterns:
|
||||
matches = re.findall(pattern, source, re.IGNORECASE)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import logging
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -20,10 +21,14 @@ def connetti_db(cfg: object) -> object:
|
||||
A MySQL connection object if the connection is successful, otherwise None.
|
||||
"""
|
||||
try:
|
||||
conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname)
|
||||
conn = mysql.connector.connect(user=cfg.dbuser,
|
||||
password=cfg.dbpass,
|
||||
host=cfg.dbhost,
|
||||
port=cfg.dbport,
|
||||
database=cfg.dbname)
|
||||
conn.autocommit = True
|
||||
logger.info("Connected")
|
||||
return conn
|
||||
except mysql.connector.Error as e:
|
||||
except Error as e:
|
||||
logger.error(f"Database connection error: {e}")
|
||||
raise # Re-raise the exception to be handled by the caller
|
||||
@@ -4,14 +4,28 @@ import asyncio
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
timestamp_cols = ['inserted_at', 'loaded_at', 'elaborated_at']
|
||||
timestamp_cols = ["inserted_at", "loaded_at", "elaborated_at"]
|
||||
|
||||
|
||||
async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
|
||||
async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
|
||||
"""Carica una lista di record di dati grezzi nel database.
|
||||
|
||||
Esegue un'operazione di inserimento massivo (executemany) per caricare i dati.
|
||||
Utilizza la clausola 'ON DUPLICATE KEY UPDATE' per aggiornare i record esistenti.
|
||||
Implementa una logica di re-tentativo in caso di deadlock.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo.
|
||||
matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
bool: True se il caricamento ha avuto successo, False altrimenti.
|
||||
"""
|
||||
if not matrice_valori:
|
||||
logger.info("Nulla da caricare.")
|
||||
return True
|
||||
sql_insert_RAWDATA = f'''
|
||||
sql_insert_RAWDATA = f"""
|
||||
INSERT INTO {cfg.dbrawdata} (
|
||||
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
|
||||
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
|
||||
@@ -47,7 +61,7 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
|
||||
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
|
||||
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
|
||||
`Created_at` = NOW()
|
||||
'''
|
||||
"""
|
||||
|
||||
rc = False
|
||||
async with pool.acquire() as conn:
|
||||
@@ -65,10 +79,12 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
|
||||
logging.error(f"Error: {e}.")
|
||||
|
||||
if e.args[0] == 1213: # Deadlock detected
|
||||
logging.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}")
|
||||
logging.warning(
|
||||
f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}"
|
||||
)
|
||||
|
||||
if attempt < cfg.max_retries - 1:
|
||||
delay = (2 * attempt)
|
||||
delay = 2 * attempt
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
@@ -76,29 +92,64 @@ async def load_data(cfg: object, matrice_valori: list, pool) -> bool :
|
||||
raise
|
||||
return rc
|
||||
|
||||
async def update_status(cfg: object, id: int, status: int, pool) -> None:
|
||||
|
||||
async def update_status(cfg: object, id: int, status: int, pool: object) -> None:
|
||||
"""Aggiorna lo stato di un record nella tabella dei record CSV.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
|
||||
id (int): L'ID del record da aggiornare.
|
||||
status (int): Il nuovo stato da impostare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(f'update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}')
|
||||
await cur.execute(
|
||||
f"update {cfg.dbrectable} set status = {status}, {timestamp_cols[status]} = now() where id = {id}"
|
||||
)
|
||||
await conn.commit()
|
||||
logging.info("Status updated.")
|
||||
except Exception as e:
|
||||
await conn.rollback()
|
||||
logging.error(f'Error: {e}')
|
||||
logging.error(f"Error: {e}")
|
||||
|
||||
async def unlock(cfg: object, id: int, pool) -> None:
|
||||
|
||||
async def unlock(cfg: object, id: int, pool: object) -> None:
|
||||
"""Sblocca un record nella tabella dei record CSV.
|
||||
|
||||
Imposta il campo 'locked' a 0 per un dato ID.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
|
||||
id (int): L'ID del record da sbloccare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(f'update {cfg.dbrectable} set locked = 0 where id = {id}')
|
||||
await cur.execute(
|
||||
f"update {cfg.dbrectable} set locked = 0 where id = {id}"
|
||||
)
|
||||
await conn.commit()
|
||||
logging.info(f"id {id} unlocked.")
|
||||
except Exception as e:
|
||||
await conn.rollback()
|
||||
logging.error(f'Error: {e}')
|
||||
logging.error(f"Error: {e}")
|
||||
|
||||
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
|
||||
|
||||
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
unit (str): Il nome dell'unità.
|
||||
tool (str): Il nome dello strumento.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
tuple: Una tupla contenente le informazioni del comando Matlab, o None in caso di errore.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
@@ -110,4 +161,4 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool) -> tuple:
|
||||
where t.name = "{tool}" and u.name = "{unit}"''')
|
||||
return cur.fetchone()
|
||||
except Exception as e:
|
||||
logging.error(f'Error: {e}')
|
||||
logging.error(f"Error: {e}")
|
||||
|
||||
39
utils/database/matlab_query.py
Normal file
39
utils/database/matlab_query.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import logging
|
||||
import aiomysql
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_matlab_command(cfg: object, tool: str, unit: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
|
||||
|
||||
Interroga il database per ottenere i dettagli necessari all'avvio di uno script
|
||||
Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit).
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
tool (str): Il nome dello strumento.
|
||||
unit (str): Il nome dell'unità.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
tuple: Una tupla contenente le informazioni del comando Matlab,
|
||||
o None se non viene trovato alcun comando.
|
||||
"""
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||
await cur.execute(f"""
|
||||
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
|
||||
INNER JOIN tools as t on t.matfunc = m.id
|
||||
INNER JOIN units as u on u.id = t.unit_id
|
||||
INNER JOIN statustools as s on t.statustool_id = s.id
|
||||
where t.name = '{tool}' AND u.name = '{unit}';
|
||||
""")
|
||||
|
||||
result = await cur.fetchone()
|
||||
|
||||
if not result:
|
||||
logger.error(f"{unit} - {tool}: Matlab command not found.")
|
||||
return None
|
||||
else:
|
||||
return result
|
||||
@@ -1,37 +1,46 @@
|
||||
from utils.database.connection import connetti_db
|
||||
|
||||
import aiomysql
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple:
|
||||
async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità.
|
||||
|
||||
with connetti_db(cfg) as conn:
|
||||
cur = conn.cursor(dictionary=True)
|
||||
query = f"""
|
||||
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
|
||||
FROM {cfg.dbname}.{cfg.dbnodes} AS n
|
||||
INNER JOIN tools AS t ON t.id = n.tool_id
|
||||
INNER JOIN units AS u ON u.id = t.unit_id
|
||||
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
|
||||
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
|
||||
ORDER BY n.num;
|
||||
"""
|
||||
#logger.info(f"{unit} - {tool}: Executing query: {query}")
|
||||
cur.execute(query)
|
||||
results = cur.fetchall()
|
||||
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
|
||||
cur.close()
|
||||
conn.close()
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
tool (str): Il nome dello strumento.
|
||||
unit (str): Il nome dell'unità.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
if not results:
|
||||
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
|
||||
return None, None, None, None
|
||||
else:
|
||||
channels, types, ains, dins = [], [], [], []
|
||||
for row in results:
|
||||
channels.append(row['channels'])
|
||||
types.append(row['type'])
|
||||
ains.append(row['ain'])
|
||||
dins.append(row['din'])
|
||||
return channels, types, ains, dins
|
||||
Returns:
|
||||
tuple: Una tupla contenente quattro liste: canali, tipi, ain, din.
|
||||
Se non vengono trovati risultati, restituisce (None, None, None, None).
|
||||
"""
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||
await cur.execute(f"""
|
||||
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
|
||||
FROM {cfg.dbname}.{cfg.dbnodes} AS n
|
||||
INNER JOIN tools AS t ON t.id = n.tool_id
|
||||
INNER JOIN units AS u ON u.id = t.unit_id
|
||||
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
|
||||
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
|
||||
ORDER BY n.num;
|
||||
""")
|
||||
|
||||
results = await cur.fetchall()
|
||||
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
|
||||
|
||||
if not results:
|
||||
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
|
||||
return None, None, None, None
|
||||
else:
|
||||
channels, types, ains, dins = [], [], [], []
|
||||
for row in results:
|
||||
channels.append(row['channels'])
|
||||
types.append(row['type'])
|
||||
ains.append(row['ain'])
|
||||
dins.append(row['din'])
|
||||
return channels, types, ains, dins
|
||||
|
||||
@@ -10,7 +10,11 @@ from utils.database.connection import connetti_db
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def ftp_SITE_ADDU(self: object, line: str) -> None:
|
||||
"""Adds a virtual user, creates their directory, and saves their details to the database.
|
||||
"""
|
||||
Adds a virtual user, creates their directory, and saves their details to the database.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username and password separated by a space.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
try:
|
||||
@@ -51,7 +55,12 @@ def ftp_SITE_ADDU(self: object, line: str) -> None:
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_DISU(self: object, line: str) -> None:
|
||||
"""Removes a virtual user from the authorizer and marks them as deleted in the database."""
|
||||
"""
|
||||
Removes a virtual user from the authorizer and marks them as deleted in the database.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username to be disabled.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
parms = line.split()
|
||||
user = os.path.basename(parms[0]) # Extract the username
|
||||
@@ -78,7 +87,12 @@ def ftp_SITE_DISU(self: object, line: str) -> None:
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_ENAU(self: object, line: str) -> None:
|
||||
"""Restores a virtual user by updating their status in the database and adding them back to the authorizer."""
|
||||
"""
|
||||
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username to be enabled.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
parms = line.split()
|
||||
user = os.path.basename(parms[0]) # Extract the username
|
||||
@@ -117,7 +131,12 @@ def ftp_SITE_ENAU(self: object, line: str) -> None:
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_LSTU(self: object, line: str) -> None:
|
||||
"""Lists all virtual users from the database."""
|
||||
"""
|
||||
Lists all virtual users from the database.
|
||||
|
||||
Args:
|
||||
line (str): An empty string (no arguments needed for this command).
|
||||
"""
|
||||
cfg = self.cfg
|
||||
users_list = []
|
||||
try:
|
||||
|
||||
104
utils/orchestrator_utils.py
Normal file
104
utils/orchestrator_utils.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
import aiomysql
|
||||
import contextvars
|
||||
from typing import Callable, Coroutine, Any
|
||||
|
||||
# Crea una context variable per identificare il worker
|
||||
worker_context = contextvars.ContextVar("worker_id", default="^-^")
|
||||
|
||||
|
||||
# Formatter personalizzato che include il worker_id
|
||||
class WorkerFormatter(logging.Formatter):
|
||||
"""Formatter personalizzato per i log che include l'ID del worker."""
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
"""Formatta il record di log includendo l'ID del worker.
|
||||
|
||||
Args:
|
||||
record (str): Il record di log da formattare.
|
||||
|
||||
Returns:
|
||||
La stringa formattata del record di log.
|
||||
"""
|
||||
record.worker_id = worker_context.get()
|
||||
return super().format(record)
|
||||
|
||||
|
||||
def setup_logging(log_filename: str, log_level_str: str):
|
||||
"""Configura il logging globale.
|
||||
|
||||
Args:
|
||||
log_filename (str): Percorso del file di log.
|
||||
log_level_str (str): Livello di log (es. "INFO", "DEBUG").
|
||||
"""
|
||||
logger = logging.getLogger()
|
||||
handler = logging.FileHandler(log_filename)
|
||||
formatter = WorkerFormatter(
|
||||
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
# Rimuovi eventuali handler esistenti e aggiungi il nostro
|
||||
if logger.hasHandlers():
|
||||
logger.handlers.clear()
|
||||
logger.addHandler(handler)
|
||||
log_level = getattr(logging, log_level_str.upper(), logging.INFO)
|
||||
logger.setLevel(log_level)
|
||||
logger.info("Logging configurato correttamente")
|
||||
|
||||
|
||||
async def run_orchestrator(
|
||||
config_class: Any,
|
||||
worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]],
|
||||
):
|
||||
"""Funzione principale che inizializza e avvia un orchestratore.
|
||||
|
||||
Args:
|
||||
config_class: La classe di configurazione da istanziare.
|
||||
worker_coro: La coroutine del worker da eseguire in parallelo.
|
||||
"""
|
||||
logger = logging.getLogger()
|
||||
logger.info("Avvio del sistema...")
|
||||
|
||||
cfg = config_class()
|
||||
logger.info("Configurazione caricata correttamente")
|
||||
|
||||
debug_mode = False
|
||||
try:
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
setup_logging(cfg.logfilename, log_level)
|
||||
debug_mode = logger.getEffectiveLevel() == logging.DEBUG
|
||||
|
||||
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
|
||||
|
||||
pool = await aiomysql.create_pool(
|
||||
host=cfg.dbhost,
|
||||
user=cfg.dbuser,
|
||||
password=cfg.dbpass,
|
||||
db=cfg.dbname,
|
||||
minsize=cfg.max_threads,
|
||||
maxsize=cfg.max_threads * 4,
|
||||
pool_recycle=3600,
|
||||
)
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(worker_coro(i, cfg, pool))
|
||||
for i in range(cfg.max_threads)
|
||||
]
|
||||
|
||||
|
||||
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, return_exceptions=debug_mode)
|
||||
finally:
|
||||
pool.close()
|
||||
await pool.wait_closed()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Info: Shutdown richiesto... chiusura in corso")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
|
||||
@@ -1 +1 @@
|
||||
"""Parser delle centraline"""
|
||||
"""Parser delle centraline con le tipologie di unit e tool"""
|
||||
|
||||
@@ -1 +1 @@
|
||||
"""Parser delle centraline"""
|
||||
"""Parser delle centraline con nomi di unit e tool"""
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'cr1000x_cr1000x'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'd2w_d2w'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g201_g201'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool,"channels")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g301_g301'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_iptm'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_mums'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as musa_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_musa'.
|
||||
|
||||
Questa funzione è un wrapper per `musa_main_loader` e passa il tipo
|
||||
di elaborazione come "musa".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await musa_main_loader(cfg, id, pool, "musa")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_mux'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool, "channels")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_dsas'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as gd_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_gd'.
|
||||
|
||||
Questa funzione è un wrapper per `gd_main_loader` e passa il tipo
|
||||
di elaborazione come "gd".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await gd_main_loader(cfg, id, pool, "gd")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_modb'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_mums'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_mux'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool, "channels")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as tlp_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'gs1_gs1'.
|
||||
|
||||
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
|
||||
di elaborazione come "tlp".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await tlp_main_loader(cfg, id, pool, "tlp")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'hortus_hortus'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
@@ -1,2 +1,35 @@
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
pass
|
||||
import subprocess
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
from utils.database.loader_action import DATA_LOADED, update_status, unlock
|
||||
from utils.csv.data_preparation import get_data
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
# Creare un file temporaneo
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
|
||||
temp_file.write(ToolData)
|
||||
temp_filename = temp_file.name
|
||||
|
||||
try:
|
||||
# Eseguire il programma con il file temporaneo
|
||||
result = await subprocess.run(['python3', 'old_script/TS_PiniScript.py', temp_filename], capture_output=True, text=True)
|
||||
print(result.stdout)
|
||||
print(result.stderr)
|
||||
finally:
|
||||
# Pulire il file temporaneo
|
||||
os.unlink(temp_filename)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result.stderr}")
|
||||
raise Exception(f"Errore nel programma: {result.stderr}")
|
||||
else:
|
||||
logger.info(f"Programma TS_PiniScript.py eseguito con successo: {result.stdout}")
|
||||
await update_status(cfg, id, DATA_LOADED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'tlp_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
@@ -1,4 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as tlp_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool) -> None:
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'tlp_tlp'.
|
||||
|
||||
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
|
||||
di elaborazione come "tlp".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await tlp_main_loader(cfg, id, pool, "tlp")
|
||||
Reference in New Issue
Block a user