Compare commits

..

4 Commits

Author SHA1 Message Date
35527c89cd fix ftp 2025-09-15 22:32:12 +02:00
8cd5a21275 fix flag elab 2025-09-15 22:06:01 +02:00
2d2668c92c setting vscode 2025-09-12 20:54:21 +02:00
adfe2e7809 fix cread user dir 2025-09-12 20:52:11 +02:00
7 changed files with 77 additions and 73 deletions

4
.vscode/setting.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"flake8.args": ["--max-line-length=140"],
"python.linting.flake8Args": ["--config","flake8.cfg"]
}

View File

@@ -10,7 +10,7 @@ import asyncio
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags from utils.database import WorkflowFlags
from utils.database.action_query import get_tool_info from utils.database.action_query import get_tool_info, check_flag_elab
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock from utils.database.loader_action import update_status, unlock
@@ -47,61 +47,65 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) if not await check_flag_elab(pool):
if record: record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] if record:
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ??? rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool) if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
if tool_elab_info: tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info['statustools'].lower() in cfg.elab_status: if tool_elab_info:
logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name) if tool_elab_info['statustools'].lower() in cfg.elab_status:
await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool) logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}" await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool)
proc = await asyncio.create_subprocess_shell( matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
matlab_cmd, proc = await asyncio.create_subprocess_shell(
cwd=cfg.matlab_func_path, matlab_cmd,
stdout=asyncio.subprocess.PIPE, cwd=cfg.matlab_func_path,
stderr=asyncio.subprocess.PIPE stdout=asyncio.subprocess.PIPE,
) stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate() stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else:
error_type = f"Matlab elab failed: {proc.returncode}."
# da verificare i log dove prenderli
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else: else:
error_type = f"Matlab elab failed: {proc.returncode}." logger.info(stdout.decode().strip())
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
# da verificare i log dove prenderli await unlock(cfg, rec_id, pool)
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f: await asyncio.sleep(ELAB_PROCESSING_DELAY)
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
else: else:
logger.info(stdout.decode().strip()) logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools'])
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, rec_id, pool) await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY) await unlock(cfg, rec_id, pool)
else: else:
logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools']) await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool) await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool) await unlock(cfg, rec_id, pool)
await unlock(cfg, rec_id, pool)
else:
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
else: else:
logger.info("Nessun record disponibile") logger.info("Flag fermo elaborazione attivato")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except

View File

@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer): class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" """Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: object) -> None: def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database. """Initializes the authorizer, adds the admin user, and loads users from the database.
Args: Args:
@@ -47,13 +47,14 @@ class DummySha256Authorizer(DummyAuthorizer):
) )
for ftpuser, user_hash, virtpath, perm in cur.fetchall(): for ftpuser, user_hash, virtpath, perm in cur.fetchall():
self.add_user(ftpuser, user_hash, virtpath, perm)
# Create the user's directory if it does not exist. # Create the user's directory if it does not exist.
try: try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}") self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication( def validate_authentication(
self: object, username: str, password: str, handler: object self: object, username: str, password: str, handler: object
) -> None: ) -> None:

View File

@@ -12,7 +12,6 @@ import asyncio
from utils.config import loader_load_data as setting from utils.config import loader_load_data as setting
from utils.database import WorkflowFlags from utils.database import WorkflowFlags
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
from utils.database.action_query import check_flag_elab
from utils.orchestrator_utils import run_orchestrator, worker_context from utils.orchestrator_utils import run_orchestrator, worker_context
# Initialize the logger for this module # Initialize the logger for this module
@@ -24,7 +23,7 @@ CSV_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV. """Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
Il worker preleva un record CSV dal database, ne elabora il contenuto Il worker preleva un record CSV dal database, ne elabora il contenuto
@@ -32,7 +31,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """
# Imposta il context per questo worker # Imposta il context per questo worker
@@ -43,24 +42,20 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
if not await check_flag_elab(): record = await get_next_csv_atomic(
record = await get_next_csv_atomic( pool,
pool, cfg.dbrectable,
cfg.dbrectable, WorkflowFlags.CSV_RECEIVED,
WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED,
WorkflowFlags.DATA_LOADED, )
)
if record: if record:
success = await load_csv(record, cfg, pool) success = await load_csv(record, cfg, pool)
if not success: if not success:
logger.error("Errore durante l'elaborazione") logger.error("Errore durante l'elaborazione")
await asyncio.sleep(CSV_PROCESSING_DELAY) await asyncio.sleep(CSV_PROCESSING_DELAY)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
else: else:
logger.info("Flag fermo elaborazione attivato") logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except

View File

@@ -28,7 +28,7 @@ ELAB_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 30 NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'invio dei dati. """Esegue il ciclo di lavoro per l'invio dei dati.
Il worker preleva un record dal database che indica dati pronti per Il worker preleva un record dal database che indica dati pronti per
@@ -37,7 +37,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """

View File

@@ -31,7 +31,7 @@ def on_file_received(self: object, file: str) -> None:
new_filename = f"{filename}_{timestamp}{fileExtension}" new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}") os.rename(file, f"{path}/{new_filename}")
if (fileExtension.upper() in (cfg.fileext)): if (fileExtension.upper() in (cfg.fileext)):
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile: with open(f"{path}/{new_filename}", 'r', encoding='utf-8', errors='ignore') as csvfile:
lines = csvfile.readlines() lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10])) unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))

View File

@@ -138,7 +138,7 @@ async def check_flag_elab(pool: object) -> None:
async with pool.acquire() as conn: async with pool.acquire() as conn:
async with conn.cursor() as cur: async with conn.cursor() as cur:
try: try:
await cur.execute("SELECT ferma_elab from admin_panel") await cur.execute("SELECT stop_elab from admin_panel")
results = await cur.fetchone() results = await cur.fetchone()
return results[0] return results[0]