Compare commits

...

9 Commits

Author SHA1 Message Date
35527c89cd fix ftp 2025-09-15 22:32:12 +02:00
8cd5a21275 fix flag elab 2025-09-15 22:06:01 +02:00
2d2668c92c setting vscode 2025-09-12 20:54:21 +02:00
adfe2e7809 fix cread user dir 2025-09-12 20:52:11 +02:00
1a99b55dbb add flag stop elab 2025-09-11 21:28:42 +02:00
54cb20b6af pylint 2 2025-09-03 21:22:35 +02:00
39dba8f54a fix pylint 2025-09-03 21:05:19 +02:00
9b3f1171f3 gitignore 2025-09-03 20:48:54 +02:00
f7e2efa03e resync toml 2025-09-03 20:39:55 +02:00
14 changed files with 435 additions and 138 deletions

16
.gitignore vendored Normal file
View File

@@ -0,0 +1,16 @@
*.pyc
*.toml
.python-version
uv.lock
*.log*
.vscode/settings.json
README.md
prova*.*
.codegpt
build/
LoadCSVData.pl
matlab_elab.py
doc_carri.txt
ase.egg-info/
site/
site.zip

2
.pylintrc Normal file
View File

@@ -0,0 +1,2 @@
# Oppure se vuoi essere più permissivo
max-line-length=140

4
.vscode/setting.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"flake8.args": ["--max-line-length=140"],
"python.linting.flake8Args": ["--config","flake8.cfg"]
}

91
docs/gen_ref_pages.py Normal file
View File

@@ -0,0 +1,91 @@
"""Genera le pagine di riferimento per l'API."""
from pathlib import Path
import mkdocs_gen_files
nav = mkdocs_gen_files.Nav()
# File e directory da escludere
EXCLUDE_PATTERNS = {
".env",
".env.*",
"__pycache__",
".git",
".pytest_cache",
".venv",
"venv",
"node_modules",
"docs", # Escludi tutta la directory docs
"build",
"dist",
"*.egg-info",
".mypy_cache",
".coverage",
"htmlcov"
}
def should_exclude(path: Path) -> bool:
"""Verifica se un percorso deve essere escluso."""
# Escludi file .env
if path.name.startswith('.env'):
return True
# Escludi lo script stesso
if path.name == "gen_ref_pages.py":
return True
# Escludi tutta la directory docs
if "old_script" in path.parts:
return True
# Escludi tutta la directory docs
if "docs" in path.parts:
return True
# Escludi pattern comuni
for pattern in EXCLUDE_PATTERNS:
if pattern in str(path):
return True
return False
# Cerca i file Python nella directory corrente
for path in sorted(Path(".").rglob("*.py")):
# Salta i file esclusi
if should_exclude(path):
continue
# Salta i file che iniziano con un punto
if any(part.startswith('.') for part in path.parts):
continue
# Salta i file che iniziano con prova
if any(part.startswith('prova') for part in path.parts):
continue
if any(part.startswith('matlab_elab') for part in path.parts):
continue
module_path = path.with_suffix("")
doc_path = path.with_suffix(".md")
full_doc_path = Path("reference", doc_path)
parts = tuple(module_path.parts)
if parts[-1] == "__init__":
parts = parts[:-1]
doc_path = doc_path.with_name("index.md")
full_doc_path = full_doc_path.with_name("index.md")
elif parts[-1] == "__main__":
continue
nav[parts] = doc_path.as_posix()
with mkdocs_gen_files.open(full_doc_path, "w") as fd:
ident = ".".join(parts)
fd.write(f"::: {ident}")
mkdocs_gen_files.set_edit_path(full_doc_path, path)
with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file:
nav_file.writelines(nav.build_literate_nav())

36
docs/index.md Normal file
View File

@@ -0,0 +1,36 @@
# Benvenuto nella documentazione
Questa è la documentazione automatica dell'applicazione Python ASE per la gestione delle file CSV ricevuti via FTP.
## Funzionalità
- Ricezione di file csv via FTP e salvataggio in database.
- Caricamnento dei dati in database con moduli dedicati per:
- tipologia di centralina e sensore
- nome di centralina e sensore
- Esecuzione elaborazione MatLab.
- Gestione utenti FTP
- Caricamento massivo utenti FTP da database
## Setup
- personalizzazione dei file env:
- env/db.ini
- env/ftp.ini
- env/load.ini
- env/elab.ini
- esecuzione del server FTP -> "python ftp_csv_receiver.py"
- esecuzione dell'orchestratore del caricamenti dei file csv -> "python load_orchestrator.py"
- esecuzione dell'orchestratore delle elaborazioni MatLab -> "python elab_orchestrator.py"
E' possibile creare servizi systemd per gestire l'esecuzione automatica delle funzionalità.
Viene usato il virtualenv quindi python deve essere eseguito con i dovuti setting
## Installazione
Installare il pacchetto ase-x.x.x-py3-none-any.whl
- pip install ase-x.x.x-py3-none-any.whl

66
mkdocs.yml Normal file
View File

@@ -0,0 +1,66 @@
site_name: Ase receiver
site_description: Documentazione automatica della app Python ASE
theme:
name: material
features:
- navigation.tabs
- navigation.sections
- toc.integrate
- navigation.top
- search.suggest
- search.highlight
- content.tabs.link
- content.code.annotation
- content.code.copy
plugins:
- offline
- search
- mkdocstrings:
handlers:
python:
paths: ["."]
options:
docstring_style: google
show_source: true
show_root_heading: true
show_root_toc_entry: true
show_symbol_type_heading: true
show_symbol_type_toc: true
filters:
- "!^docs" # Escludi tutto ciò che inizia con "docs"
- gen-files:
scripts:
- docs/gen_ref_pages.py
- literate-nav:
nav_file: SUMMARY.md
nav:
- Home: index.md
- API Reference: reference/
markdown_extensions:
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.superfences
- pymdownx.tabbed:
alternate_style: true
- admonition
- pymdownx.details
- attr_list
- md_in_html
# Escludi file dalla build
exclude_docs: |
.env*
__pycache__/
.git/
.pytest_cache/
.venv/
venv/
test/
.vscode/

View File

@@ -20,6 +20,7 @@ dev = [
"mkdocs-literate-nav>=0.6.2",
"mkdocs-material>=9.6.15",
"mkdocstrings[python]>=0.29.1",
"ruff>=0.12.11",
]
[tool.setuptools]
@@ -27,4 +28,4 @@ package-dir = {"" = "src"}
[tool.setuptools.packages.find]
exclude = ["test","build"]
where = ["src"]
where = ["src"]

View File

@@ -1,4 +1,7 @@
#!.venv/bin/python
"""
Orchestratore dei worker che lanciano le elaborazioni
"""
# Import necessary libraries
import logging
@@ -7,7 +10,7 @@ import asyncio
# Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags
from utils.database.action_query import get_tool_info
from utils.database.action_query import get_tool_info, check_flag_elab
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock
@@ -22,6 +25,7 @@ ELAB_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
@@ -43,69 +47,69 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True:
try:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
if record:
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
await update_status(cfg, id, WorkflowFlags.START_ELAB, pool)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
if not await check_flag_elab(pool):
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
if record:
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name)
await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else:
error_type = f"Matlab elab failed: {proc.returncode}."
# da verificare i log dove prenderli
# with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
# f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
if proc.returncode != 0:
logger.error("Errore durante l'elaborazione")
logger.error(stderr.decode().strip())
if proc.returncode == 124:
error_type = f"Matlab elab excessive duration: killed after {cfg.matlab_timeout} seconds."
else:
error_type = f"Matlab elab failed: {proc.returncode}."
"""
da verificare i log dove prenderli
with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
f.write(stderr.decode().strip())
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
"""
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
logger.info(stdout.decode().strip())
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info(stdout.decode().strip())
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['statustools']}: MatLab calc by-passed.")
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, id, pool)
else:
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, id, pool)
logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools'])
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
else:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
else:
logger.info("Nessun record disponibile")
logger.info("Flag fermo elaborazione attivato")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode)
await asyncio.sleep(1)

View File

@@ -1,27 +1,32 @@
#!.venv/bin/python
"""This module implements an FTP server with custom commands for managing virtual users and handling CSV file uploads."""
"""
This module implements an FTP server with custom commands for
managing virtual users and handling CSV file uploads.
"""
import os
import logging
from hashlib import sha256
from pathlib import Path
from utils.config import loader_ftp_csv as setting
from utils.database.connection import connetti_db
from utils.connect import user_admin, file_management
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
from utils.config import loader_ftp_csv as setting
from utils.database.connection import connetti_db
from utils.connect import user_admin, file_management
# Configure logging (moved inside main function)
logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: object) -> None:
def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database.
Args:
@@ -29,39 +34,45 @@ class DummySha256Authorizer(DummyAuthorizer):
"""
super().__init__()
self.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]
)
# Define the database connection
conn = connetti_db(cfg)
# Create a cursor
cur = conn.cursor()
cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL')
cur.execute(
f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL"
)
for ftpuser, hash, virtpath, perm in cur.fetchall():
self.add_user(ftpuser, hash, virtpath, perm)
"""
Create the user's directory if it does not exist.
"""
for ftpuser, user_hash, virtpath, perm in cur.fetchall():
# Create the user's directory if it does not exist.
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.responde(f'551 Error in create virtual user path: {e}')
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored hash
hash = sha256(password.encode("UTF-8")).hexdigest()
def validate_authentication(
self: object, username: str, password: str, handler: object
) -> None:
# Validate the user's password against the stored user_hash
user_hash = sha256(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != hash:
if self.user_table[username]["pwd"] != user_hash:
raise KeyError
except KeyError:
raise AuthenticationFailed
class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
def __init__(self: object, conn: object, server: object, ioloop:object=None) -> None:
def __init__(
self: object, conn: object, server: object, ioloop: object = None
) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions.
Args:
@@ -73,20 +84,44 @@ class ASEHandler(FTPHandler):
self.proto_cmds = FTPHandler.proto_cmds.copy()
# Add custom FTP commands for managing virtual users - command in lowercase
self.proto_cmds.update(
{'SITE ADDU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
{
"SITE ADDU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE DISU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> DISU USERNAME (disable virtual user).')}
{
"SITE DISU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> DISU USERNAME (disable virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE ENAU': dict(perm='M', auth=True, arg=True,
help='Syntax: SITE <SP> ENAU USERNAME (enable virtual user).')}
{
"SITE ENAU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ENAU USERNAME (enable virtual user).",
)
}
)
self.proto_cmds.update(
{'SITE LSTU': dict(perm='M', auth=True, arg=None,
help='Syntax: SITE <SP> LSTU (list virtual users).')}
{
"SITE LSTU": dict(
perm="M",
auth=True,
arg=None,
help="Syntax: SITE <SP> LSTU (list virtual users).",
)
}
)
def on_file_received(self: object, file: str) -> None:
@@ -111,6 +146,7 @@ class ASEHandler(FTPHandler):
def ftp_SITE_LSTU(self: object, line: str) -> None:
return user_admin.ftp_SITE_LSTU(self, line)
def main():
"""Main function to start the FTP server."""
# Load the configuration settings
@@ -140,9 +176,8 @@ def main():
server.serve_forever()
except Exception as e:
logger.error(
f"Exit with error: {e}."
)
logger.error("Exit with error: %s.", e)
if __name__ == "__main__":
main()
main()

View File

@@ -1,15 +1,16 @@
#!/usr/bin/env python3
#!.venv/bin/python
"""
Script per prelevare dati da MySQL e inviare comandi SITE FTP
"""
import mysql.connector
from utils.database.connection import connetti_db
from utils.config import users_loader as setting
from ftplib import FTP
import logging
import sys
from typing import List, Tuple
import mysql.connector
from utils.database.connection import connetti_db
from utils.config import users_loader as setting
# Configurazione logging
logging.basicConfig(
@@ -38,8 +39,8 @@ def connect_ftp() -> FTP:
ftp.login(FTP_CONFIG['user'], FTP_CONFIG['password'])
logger.info("Connessione FTP stabilita")
return ftp
except Exception as e:
logger.error(f"Errore connessione FTP: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore connessione FTP: %s", e)
sys.exit(1)
def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tuple]:
@@ -63,11 +64,11 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tupl
cursor.execute(query)
results = cursor.fetchall()
logger.info(f"Prelevate {len(results)} righe dal database")
logger.info("Prelevate %s righe dal database", len(results))
return results
except mysql.connector.Error as e:
logger.error(f"Errore query database: {e}")
logger.error("Errore query database: %s", e)
return []
finally:
cursor.close()
@@ -82,14 +83,13 @@ def send_site_command(ftp: FTP, command: str) -> bool:
Returns:
bool: True if the command was sent successfully, False otherwise.
"""
"""Invia un comando SITE al server FTP"""
try:
# Il comando SITE viene inviato usando sendcmd
response = ftp.sendcmd(f"SITE {command}")
logger.info(f"Comando SITE '{command}' inviato. Risposta: {response}")
logger.info("Comando SITE %s inviato. Risposta: %s", command, response)
return True
except Exception as e:
logger.error(f"Errore invio comando SITE '{command}': {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore invio comando SITE %s: %s", command, e)
return False
def main():
@@ -121,7 +121,7 @@ def main():
# Costruisci il comando SITE completo
ftp_site_command = f'addu {username} {password}'
logger.info(f"Sending ftp command: {ftp_site_command}")
logger.info("Sending ftp command: %s", ftp_site_command)
# Invia comando SITE
if send_site_command(ftp_connection, ftp_site_command):
@@ -129,24 +129,24 @@ def main():
else:
error_count += 1
logger.info(f"Elaborazione completata. Successi: {success_count}, Errori: {error_count}")
logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count)
except Exception as e:
logger.error(f"Errore generale: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)
finally:
# Chiudi connessioni
try:
ftp_connection.quit()
logger.info("Connessione FTP chiusa")
except Exception as e:
logger.error(f"Errore chiusura connessione FTP: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione FTP: %s", e)
try:
db_connection.close()
logger.info("Connessione MySQL chiusa")
except Exception as e:
logger.error(f"Errore chiusura connessione MySQL: {e}")
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione MySQL: %s", e)
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,7 @@
#!.venv/bin/python
"""
Orchestratore dei worker che caricano i dati su dataraw
"""
# Import necessary libraries
import logging
@@ -19,7 +22,8 @@ CSV_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None:
async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
Il worker preleva un record CSV dal database, ne elabora il contenuto
@@ -27,7 +31,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database.
"""
# Imposta il context per questo worker
@@ -38,8 +42,12 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True:
try:
logger.info("Inizio elaborazione")
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED)
record = await get_next_csv_atomic(
pool,
cfg.dbrectable,
WorkflowFlags.CSV_RECEIVED,
WorkflowFlags.DATA_LOADED,
)
if record:
success = await load_csv(record, cfg, pool)
@@ -50,8 +58,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=1)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=1)
await asyncio.sleep(1)
@@ -59,7 +67,8 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
"""Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
Args:
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
record: Una tupla contenente i dettagli del record CSV da elaborare
(rec_id, unit_type, tool_type, unit_name, tool_name).
cfg: L'oggetto di configurazione contenente i parametri del sistema.
pool (object): Il pool di connessioni al database.
@@ -70,11 +79,16 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.debug("Inizio ricerca nuovo CSV da elaborare")
id, unit_type, tool_type, unit_name, tool_name = [
rec_id, unit_type, tool_type, unit_name, tool_name = [
x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
]
logger.info(
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
"Trovato CSV da elaborare: ID=%s, Tipo=%s_%s, Nome=%s_%s",
rec_id,
unit_type,
tool_type,
unit_name,
tool_name,
)
# Costruisce il nome del modulo da caricare dinamicamente
@@ -87,27 +101,29 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
modulo = None
for module_name in module_names:
try:
logger.debug(f"Caricamento dinamico del modulo: {module_name}")
logger.debug("Caricamento dinamico del modulo: %s", module_name)
modulo = importlib.import_module(module_name)
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
logger.info("Funzione 'main_loader' caricata dal modulo %s", module_name)
break
except (ImportError, AttributeError) as e:
logger.debug(
f"Modulo {module_name} non presente o non valido. {e}",
"Modulo %s non presente o non valido. %s",
module_name,
e,
exc_info=debug_mode,
)
if not modulo:
logger.error(f"Nessun modulo trovato {module_names}")
logger.error("Nessun modulo trovato %s", module_names)
return False
# Ottiene la funzione 'main_loader' dal modulo
funzione = getattr(modulo, "main_loader")
# Esegui la funzione
logger.info(f"Elaborazione con modulo {modulo} per ID={id}")
await funzione(cfg, id, pool)
logger.info(f"Elaborazione completata per ID={id}")
logger.info("Elaborazione con modulo %s per ID=%s", modulo, rec_id)
await funzione(cfg, rec_id, pool)
logger.info("Elaborazione completata per ID=%s", rec_id)
return True

View File

@@ -1,4 +1,7 @@
#!.venv/bin/python
"""
Orchestratore dei worker che inviano i dati ai clienti
"""
# Import necessary libraries
import logging
@@ -11,8 +14,9 @@ from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.connect.send_data import process_workflow_record
from utils.general import alterna_valori
#from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer
# from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, \
# ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer
# Initialize the logger for this module
@@ -23,7 +27,8 @@ ELAB_PROCESSING_DELAY = 0.2
# Tempo di attesa se non ci sono record da elaborare
NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None:
async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'invio dei dati.
Il worker preleva un record dal database che indica dati pronti per
@@ -32,7 +37,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args:
worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione.
cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database.
"""
@@ -42,7 +47,10 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.info("Avviato")
alternatore = alterna_valori([WorkflowFlags.CSV_RECEIVED,WorkflowFlags.SENT_RAW_DATA], [WorkflowFlags.DATA_ELABORATED, WorkflowFlags.SENT_ELAB_DATA])
alternatore = alterna_valori(
[WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA],
[WorkflowFlags.DATA_ELABORATED, WorkflowFlags.SENT_ELAB_DATA],
)
while True:
try:
@@ -58,8 +66,8 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e:
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode)
await asyncio.sleep(1)
@@ -67,5 +75,6 @@ async def main():
"""Funzione principale che avvia il send_orchestrator."""
await run_orchestrator(setting.Config, worker)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@@ -1,10 +1,10 @@
import os
from datetime import datetime
import logging
import re
import mysql.connector
from utils.database.connection import connetti_db
from utils.csv.parser import extract_value
logger = logging.getLogger(__name__)
@@ -27,8 +27,11 @@ def on_file_received(self: object, file: str) -> None:
cfg = self.cfg
path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}")
if (fileExtension.upper() in (cfg.fileext)):
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile:
with open(f"{path}/{new_filename}", 'r', encoding='utf-8', errors='ignore') as csvfile:
lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
@@ -74,13 +77,15 @@ def on_file_received(self: object, file: str) -> None:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
try:
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
conn.commit()
conn.close()
except Exception as e:
logger.error(f'File {file} not loaded. Held in user path.')
logger.error(f'File {new_filename} not loaded. Held in user path.')
logger.error(f'{e}')
"""
else:
os.remove(file)
logger.info(f'File {file} loaded: removed.')
logger.info(f'File {new_filename} removed.')
"""

View File

@@ -132,4 +132,16 @@ async def get_elab_timestamp(id_recv: int, pool: object) -> float:
except Exception as e:
logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}")
return None
return None
async def check_flag_elab(pool: object) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute("SELECT stop_elab from admin_panel")
results = await cur.fetchone()
return results[0]
except Exception as e:
logger.error(f"Errore nella query check flag stop elaborazioni: {e}")
return None