Compare commits

..

5 Commits

Author SHA1 Message Date
35527c89cd fix ftp 2025-09-15 22:32:12 +02:00
8cd5a21275 fix flag elab 2025-09-15 22:06:01 +02:00
2d2668c92c setting vscode 2025-09-12 20:54:21 +02:00
adfe2e7809 fix cread user dir 2025-09-12 20:52:11 +02:00
1a99b55dbb add flag stop elab 2025-09-11 21:28:42 +02:00
11 changed files with 279 additions and 63 deletions

2
.gitignore vendored
View File

@@ -8,11 +8,9 @@ README.md
prova*.* prova*.*
.codegpt .codegpt
build/ build/
docs/
LoadCSVData.pl LoadCSVData.pl
matlab_elab.py matlab_elab.py
doc_carri.txt doc_carri.txt
ase.egg-info/ ase.egg-info/
mkdocs.yml
site/ site/
site.zip site.zip

4
.vscode/setting.json vendored Normal file
View File

@@ -0,0 +1,4 @@
{
"flake8.args": ["--max-line-length=140"],
"python.linting.flake8Args": ["--config","flake8.cfg"]
}

91
docs/gen_ref_pages.py Normal file
View File

@@ -0,0 +1,91 @@
"""Genera le pagine di riferimento per l'API."""
from pathlib import Path
import mkdocs_gen_files
nav = mkdocs_gen_files.Nav()
# File e directory da escludere
EXCLUDE_PATTERNS = {
".env",
".env.*",
"__pycache__",
".git",
".pytest_cache",
".venv",
"venv",
"node_modules",
"docs", # Escludi tutta la directory docs
"build",
"dist",
"*.egg-info",
".mypy_cache",
".coverage",
"htmlcov"
}
def should_exclude(path: Path) -> bool:
"""Verifica se un percorso deve essere escluso."""
# Escludi file .env
if path.name.startswith('.env'):
return True
# Escludi lo script stesso
if path.name == "gen_ref_pages.py":
return True
# Escludi tutta la directory docs
if "old_script" in path.parts:
return True
# Escludi tutta la directory docs
if "docs" in path.parts:
return True
# Escludi pattern comuni
for pattern in EXCLUDE_PATTERNS:
if pattern in str(path):
return True
return False
# Cerca i file Python nella directory corrente
for path in sorted(Path(".").rglob("*.py")):
# Salta i file esclusi
if should_exclude(path):
continue
# Salta i file che iniziano con un punto
if any(part.startswith('.') for part in path.parts):
continue
# Salta i file che iniziano con prova
if any(part.startswith('prova') for part in path.parts):
continue
if any(part.startswith('matlab_elab') for part in path.parts):
continue
module_path = path.with_suffix("")
doc_path = path.with_suffix(".md")
full_doc_path = Path("reference", doc_path)
parts = tuple(module_path.parts)
if parts[-1] == "__init__":
parts = parts[:-1]
doc_path = doc_path.with_name("index.md")
full_doc_path = full_doc_path.with_name("index.md")
elif parts[-1] == "__main__":
continue
nav[parts] = doc_path.as_posix()
with mkdocs_gen_files.open(full_doc_path, "w") as fd:
ident = ".".join(parts)
fd.write(f"::: {ident}")
mkdocs_gen_files.set_edit_path(full_doc_path, path)
with mkdocs_gen_files.open("reference/SUMMARY.md", "w") as nav_file:
nav_file.writelines(nav.build_literate_nav())

36
docs/index.md Normal file
View File

@@ -0,0 +1,36 @@
# Benvenuto nella documentazione
Questa è la documentazione automatica dell'applicazione Python ASE per la gestione delle file CSV ricevuti via FTP.
## Funzionalità
- Ricezione di file csv via FTP e salvataggio in database.
- Caricamnento dei dati in database con moduli dedicati per:
- tipologia di centralina e sensore
- nome di centralina e sensore
- Esecuzione elaborazione MatLab.
- Gestione utenti FTP
- Caricamento massivo utenti FTP da database
## Setup
- personalizzazione dei file env:
- env/db.ini
- env/ftp.ini
- env/load.ini
- env/elab.ini
- esecuzione del server FTP -> "python ftp_csv_receiver.py"
- esecuzione dell'orchestratore del caricamenti dei file csv -> "python load_orchestrator.py"
- esecuzione dell'orchestratore delle elaborazioni MatLab -> "python elab_orchestrator.py"
E' possibile creare servizi systemd per gestire l'esecuzione automatica delle funzionalità.
Viene usato il virtualenv quindi python deve essere eseguito con i dovuti setting
## Installazione
Installare il pacchetto ase-x.x.x-py3-none-any.whl
- pip install ase-x.x.x-py3-none-any.whl

66
mkdocs.yml Normal file
View File

@@ -0,0 +1,66 @@
site_name: Ase receiver
site_description: Documentazione automatica della app Python ASE
theme:
name: material
features:
- navigation.tabs
- navigation.sections
- toc.integrate
- navigation.top
- search.suggest
- search.highlight
- content.tabs.link
- content.code.annotation
- content.code.copy
plugins:
- offline
- search
- mkdocstrings:
handlers:
python:
paths: ["."]
options:
docstring_style: google
show_source: true
show_root_heading: true
show_root_toc_entry: true
show_symbol_type_heading: true
show_symbol_type_toc: true
filters:
- "!^docs" # Escludi tutto ciò che inizia con "docs"
- gen-files:
scripts:
- docs/gen_ref_pages.py
- literate-nav:
nav_file: SUMMARY.md
nav:
- Home: index.md
- API Reference: reference/
markdown_extensions:
- pymdownx.highlight:
anchor_linenums: true
- pymdownx.inlinehilite
- pymdownx.snippets
- pymdownx.superfences
- pymdownx.tabbed:
alternate_style: true
- admonition
- pymdownx.details
- attr_list
- md_in_html
# Escludi file dalla build
exclude_docs: |
.env*
__pycache__/
.git/
.pytest_cache/
.venv/
venv/
test/
.vscode/

View File

@@ -10,7 +10,7 @@ import asyncio
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags from utils.database import WorkflowFlags
from utils.database.action_query import get_tool_info from utils.database.action_query import get_tool_info, check_flag_elab
from utils.csv.loaders import get_next_csv_atomic from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock from utils.database.loader_action import update_status, unlock
@@ -47,6 +47,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
if not await check_flag_elab(pool):
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED) record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
if record: if record:
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record] rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
@@ -103,6 +104,9 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
else: else:
logger.info("Nessun record disponibile") logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP) await asyncio.sleep(NO_RECORD_SLEEP)
else:
logger.info("Flag fermo elaborazione attivato")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode) logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode)

View File

@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
class DummySha256Authorizer(DummyAuthorizer): class DummySha256Authorizer(DummyAuthorizer):
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database.""" """Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
def __init__(self: object, cfg: object) -> None: def __init__(self: object, cfg: dict) -> None:
"""Initializes the authorizer, adds the admin user, and loads users from the database. """Initializes the authorizer, adds the admin user, and loads users from the database.
Args: Args:
@@ -47,13 +47,14 @@ class DummySha256Authorizer(DummyAuthorizer):
) )
for ftpuser, user_hash, virtpath, perm in cur.fetchall(): for ftpuser, user_hash, virtpath, perm in cur.fetchall():
self.add_user(ftpuser, user_hash, virtpath, perm)
# Create the user's directory if it does not exist. # Create the user's directory if it does not exist.
try: try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True) Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}") self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication( def validate_authentication(
self: object, username: str, password: str, handler: object self: object, username: str, password: str, handler: object
) -> None: ) -> None:

View File

@@ -23,7 +23,7 @@ CSV_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 60 NO_RECORD_SLEEP = 60
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV. """Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
Il worker preleva un record CSV dal database, ne elabora il contenuto Il worker preleva un record CSV dal database, ne elabora il contenuto
@@ -31,7 +31,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """
# Imposta il context per questo worker # Imposta il context per questo worker
@@ -42,7 +42,6 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
while True: while True:
try: try:
logger.info("Inizio elaborazione") logger.info("Inizio elaborazione")
record = await get_next_csv_atomic( record = await get_next_csv_atomic(
pool, pool,
cfg.dbrectable, cfg.dbrectable,

View File

@@ -28,7 +28,7 @@ ELAB_PROCESSING_DELAY = 0.2
NO_RECORD_SLEEP = 30 NO_RECORD_SLEEP = 30
async def worker(worker_id: int, cfg: object, pool: object) -> None: async def worker(worker_id: int, cfg: dict, pool: object) -> None:
"""Esegue il ciclo di lavoro per l'invio dei dati. """Esegue il ciclo di lavoro per l'invio dei dati.
Il worker preleva un record dal database che indica dati pronti per Il worker preleva un record dal database che indica dati pronti per
@@ -37,7 +37,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
Args: Args:
worker_id (int): L'ID univoco del worker. worker_id (int): L'ID univoco del worker.
cfg (object): L'oggetto di configurazione. cfg (dict): L'oggetto di configurazione.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
""" """

View File

@@ -1,10 +1,10 @@
import os import os
from datetime import datetime
import logging import logging
import re import re
import mysql.connector import mysql.connector
from utils.database.connection import connetti_db from utils.database.connection import connetti_db
from utils.csv.parser import extract_value from utils.csv.parser import extract_value
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,8 +27,11 @@ def on_file_received(self: object, file: str) -> None:
cfg = self.cfg cfg = self.cfg
path, filenameExt = os.path.split(file) path, filenameExt = os.path.split(file)
filename, fileExtension = os.path.splitext(filenameExt) filename, fileExtension = os.path.splitext(filenameExt)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}")
if (fileExtension.upper() in (cfg.fileext)): if (fileExtension.upper() in (cfg.fileext)):
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile: with open(f"{path}/{new_filename}", 'r', encoding='utf-8', errors='ignore') as csvfile:
lines = csvfile.readlines() lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10])) unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
@@ -74,13 +77,15 @@ def on_file_received(self: object, file: str) -> None:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}' tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
try: try:
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info)) cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
conn.commit() conn.commit()
conn.close() conn.close()
except Exception as e: except Exception as e:
logger.error(f'File {file} not loaded. Held in user path.') logger.error(f'File {new_filename} not loaded. Held in user path.')
logger.error(f'{e}') logger.error(f'{e}')
"""
else: else:
os.remove(file) os.remove(file)
logger.info(f'File {file} loaded: removed.') logger.info(f'File {new_filename} removed.')
"""

View File

@@ -133,3 +133,15 @@ async def get_elab_timestamp(id_recv: int, pool: object) -> float:
except Exception as e: except Exception as e:
logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}") logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}")
return None return None
async def check_flag_elab(pool: object) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute("SELECT stop_elab from admin_panel")
results = await cur.fetchone()
return results[0]
except Exception as e:
logger.error(f"Errore nella query check flag stop elaborazioni: {e}")
return None