lint con ruff

This commit is contained in:
2025-09-22 22:30:54 +02:00
parent 35527c89cd
commit fb2b2724ed
54 changed files with 585 additions and 432 deletions

View File

@@ -1,4 +1,6 @@
{
"python.analysis.autoImportCompletions": true,
"python.analysis.typeCheckingMode": "standard",
"flake8.args": ["--max-line-length=140"],
"python.linting.flake8Args": ["--config","flake8.cfg"]
}

View File

@@ -29,3 +29,27 @@ package-dir = {"" = "src"}
[tool.setuptools.packages.find]
exclude = ["test","build"]
where = ["src"]
[tool.ruff]
# Lunghezza massima della riga
line-length = 160
[tool.ruff.lint]
# Regole di linting da abilitare
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
]
# Regole da ignorare
ignore = []
[tool.ruff.format]
# Usa virgole finali
quote-style = "double"
indent-style = "space"

View File

@@ -4,18 +4,18 @@ Orchestratore dei worker che lanciano le elaborazioni
"""
# Import necessary libraries
import logging
import asyncio
import logging
# Import custom modules for configuration and database connection
from utils.config import loader_matlab_elab as setting
from utils.database import WorkflowFlags
from utils.database.action_query import get_tool_info, check_flag_elab
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.database.loader_action import update_status, unlock
from utils.connect.send_email import send_error_email
from utils.csv.loaders import get_next_csv_atomic
from utils.database import WorkflowFlags
from utils.database.action_query import check_flag_elab, get_tool_info
from utils.database.loader_action import unlock, update_status
from utils.general import read_error_lines_from_logs
from utils.orchestrator_utils import run_orchestrator, worker_context
# Initialize the logger for this module
logger = logging.getLogger()
@@ -51,18 +51,16 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
if record:
rec_id, _, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati ???
tool_elab_info = await get_tool_info(WorkflowFlags.DATA_ELABORATED, unit_name.upper(), tool_name.upper(), pool)
if tool_elab_info:
if tool_elab_info['statustools'].lower() in cfg.elab_status:
if tool_elab_info["statustools"].lower() in cfg.elab_status:
logger.info("Elaborazione ID %s per %s %s", rec_id, unit_name, tool_name)
await update_status(cfg, rec_id, WorkflowFlags.START_ELAB, pool)
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh \
{cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
proc = await asyncio.create_subprocess_shell(
matlab_cmd,
cwd=cfg.matlab_func_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
matlab_cmd, cwd=cfg.matlab_func_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
@@ -82,9 +80,12 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
# errors = [line for line in stderr.decode().strip() if line.startswith("Error")]
# warnings = [line for line in stderr.decode().strip() if not line.startswith("Error")]
errors, warnings = await read_error_lines_from_logs(cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt")
await send_error_email(unit_name.upper(), tool_name.upper(), tool_elab_info['matcall'], error_type, errors, warnings)
errors, warnings = await read_error_lines_from_logs(
cfg.matlab_error_path, f"_{unit_name}_{tool_name}*_*_output_error.txt"
)
await send_error_email(
unit_name.upper(), tool_name.upper(), tool_elab_info["matcall"], error_type, errors, warnings
)
else:
logger.info(stdout.decode().strip())
@@ -92,7 +93,9 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
await unlock(cfg, rec_id, pool)
await asyncio.sleep(ELAB_PROCESSING_DELAY)
else:
logger.info("ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info['statustools'])
logger.info(
"ID %s %s - %s %s: MatLab calc by-passed.", rec_id, unit_name, tool_name, tool_elab_info["statustools"]
)
await update_status(cfg, rec_id, WorkflowFlags.DATA_ELABORATED, pool)
await update_status(cfg, rec_id, WorkflowFlags.DUMMY_ELABORATED, pool)
await unlock(cfg, rec_id, pool)
@@ -108,7 +111,7 @@ async def worker(worker_id: int, cfg: object, pool: object) -> None:
logger.info("Flag fermo elaborazione attivato")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=debug_mode)
await asyncio.sleep(1)
@@ -117,5 +120,6 @@ async def main():
"""Funzione principale che avvia l'elab_orchestrator."""
await run_orchestrator(setting.Config, worker)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())

View File

@@ -1,22 +1,21 @@
#!.venv/bin/python
"""
This module implements an FTP server with custom commands for
managing virtual users and handling CSV file uploads.
This module implements an FTP server with custom commands for
managing virtual users and handling CSV file uploads.
"""
import os
import logging
import os
from hashlib import sha256
from pathlib import Path
from pyftpdlib.authorizers import AuthenticationFailed, DummyAuthorizer
from pyftpdlib.handlers import FTPHandler
from pyftpdlib.servers import FTPServer
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
from utils.config import loader_ftp_csv as setting
from utils.connect import file_management, user_admin
from utils.database.connection import connetti_db
from utils.connect import user_admin, file_management
# Configure logging (moved inside main function)
@@ -33,46 +32,37 @@ class DummySha256Authorizer(DummyAuthorizer):
cfg: The configuration object.
"""
super().__init__()
self.add_user(
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3]
)
self.add_user(cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
# Define the database connection
conn = connetti_db(cfg)
# Create a cursor
cur = conn.cursor()
cur.execute(
f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL"
)
cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL")
for ftpuser, user_hash, virtpath, perm in cur.fetchall():
# Create the user's directory if it does not exist.
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
self.add_user(ftpuser, user_hash, virtpath, perm)
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
self.responde(f"551 Error in create virtual user path: {e}")
def validate_authentication(
self: object, username: str, password: str, handler: object
) -> None:
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
# Validate the user's password against the stored user_hash
user_hash = sha256(password.encode("UTF-8")).hexdigest()
try:
if self.user_table[username]["pwd"] != user_hash:
raise KeyError
except KeyError:
raise AuthenticationFailed
raise AuthenticationFailed # noqa: B904
class ASEHandler(FTPHandler):
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
def __init__(
self: object, conn: object, server: object, ioloop: object = None
) -> None:
def __init__(self: object, conn: object, server: object, ioloop: object = None) -> None:
"""Initializes the handler, adds custom commands, and sets up command permissions.
Args:
@@ -85,42 +75,42 @@ class ASEHandler(FTPHandler):
# Add custom FTP commands for managing virtual users - command in lowercase
self.proto_cmds.update(
{
"SITE ADDU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).",
)
"SITE ADDU": {
"perm": "M",
"auth": True,
"arg": True,
"help": "Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).",
}
}
)
self.proto_cmds.update(
{
"SITE DISU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> DISU USERNAME (disable virtual user).",
)
"SITE DISU": {
"perm": "M",
"auth": True,
"arg": True,
"help": "Syntax: SITE <SP> DISU USERNAME (disable virtual user).",
}
}
)
self.proto_cmds.update(
{
"SITE ENAU": dict(
perm="M",
auth=True,
arg=True,
help="Syntax: SITE <SP> ENAU USERNAME (enable virtual user).",
)
"SITE ENAU": {
"perm": "M",
"auth": True,
"arg": True,
"help": "Syntax: SITE <SP> ENAU USERNAME (enable virtual user).",
}
}
)
self.proto_cmds.update(
{
"SITE LSTU": dict(
perm="M",
auth=True,
arg=None,
help="Syntax: SITE <SP> LSTU (list virtual users).",
)
"SITE LSTU": {
"perm": "M",
"auth": True,
"arg": None,
"help": "Syntax: SITE <SP> LSTU (list virtual users).",
}
}
)

View File

@@ -3,29 +3,22 @@
Script per prelevare dati da MySQL e inviare comandi SITE FTP
"""
from ftplib import FTP
import logging
import sys
from typing import List, Tuple
import mysql.connector
from utils.database.connection import connetti_db
from utils.config import users_loader as setting
from ftplib import FTP
import mysql.connector
from utils.config import users_loader as setting
from utils.database.connection import connetti_db
# Configurazione logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configurazione server FTP
FTP_CONFIG = {
'host': 'localhost',
'user': 'admin',
'password': 'batt1l0',
'port': 2121
}
FTP_CONFIG = {"host": "localhost", "user": "admin", "password": "batt1l0", "port": 2121}
def connect_ftp() -> FTP:
"""
@@ -35,15 +28,16 @@ def connect_ftp() -> FTP:
"""
try:
ftp = FTP()
ftp.connect(FTP_CONFIG['host'], FTP_CONFIG['port'])
ftp.login(FTP_CONFIG['user'], FTP_CONFIG['password'])
ftp.connect(FTP_CONFIG["host"], FTP_CONFIG["port"])
ftp.login(FTP_CONFIG["user"], FTP_CONFIG["password"])
logger.info("Connessione FTP stabilita")
return ftp
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore connessione FTP: %s", e)
sys.exit(1)
def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tuple]:
def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> list[tuple]:
"""
Fetches username and password data from the 'ftp_accounts' table in the database.
@@ -73,6 +67,7 @@ def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tupl
finally:
cursor.close()
def send_site_command(ftp: FTP, command: str) -> bool:
"""
Sends a SITE command to the FTP server.
@@ -88,10 +83,11 @@ def send_site_command(ftp: FTP, command: str) -> bool:
response = ftp.sendcmd(f"SITE {command}")
logger.info("Comando SITE %s inviato. Risposta: %s", command, response)
return True
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore invio comando SITE %s: %s", command, e)
return False
def main():
"""
Main function to connect to the database, fetch FTP user data, and send SITE ADDU commands to the FTP server.
@@ -119,7 +115,7 @@ def main():
username, password = row
# Costruisci il comando SITE completo
ftp_site_command = f'addu {username} {password}'
ftp_site_command = f"addu {username} {password}"
logger.info("Sending ftp command: %s", ftp_site_command)
@@ -131,7 +127,7 @@ def main():
logger.info("Elaborazione completata. Successi: %s, Errori: %s", success_count, error_count)
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore generale: %s", e)
finally:
@@ -139,14 +135,15 @@ def main():
try:
ftp_connection.quit()
logger.info("Connessione FTP chiusa")
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione FTP: %s", e)
try:
db_connection.close()
logger.info("Connessione MySQL chiusa")
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore chiusura connessione MySQL: %s", e)
if __name__ == "__main__":
main()
main()

View File

@@ -4,14 +4,14 @@ Orchestratore dei worker che caricano i dati su dataraw
"""
# Import necessary libraries
import logging
import importlib
import asyncio
import importlib
import logging
# Import custom modules for configuration and database connection
from utils.config import loader_load_data as setting
from utils.database import WorkflowFlags
from utils.csv.loaders import get_next_csv_atomic
from utils.database import WorkflowFlags
from utils.orchestrator_utils import run_orchestrator, worker_context
# Initialize the logger for this module
@@ -58,7 +58,7 @@ async def worker(worker_id: int, cfg: dict, pool: object) -> None:
logger.info("Nessun record disponibile")
await asyncio.sleep(NO_RECORD_SLEEP)
except Exception as e: # pylint: disable=broad-except
except Exception as e: # pylint: disable=broad-except
logger.error("Errore durante l'esecuzione: %s", e, exc_info=1)
await asyncio.sleep(1)
@@ -79,9 +79,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
logger.debug("Inizio ricerca nuovo CSV da elaborare")
rec_id, unit_type, tool_type, unit_name, tool_name = [
x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
]
rec_id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
logger.info(
"Trovato CSV da elaborare: ID=%s, Tipo=%s_%s, Nome=%s_%s",
rec_id,
@@ -118,7 +116,7 @@ async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
return False
# Ottiene la funzione 'main_loader' dal modulo
funzione = getattr(modulo, "main_loader")
funzione = modulo.main_loader
# Esegui la funzione
logger.info("Elaborazione con modulo %s per ID=%s", modulo, rec_id)

View File

@@ -4,16 +4,16 @@ Orchestratore dei worker che inviano i dati ai clienti
"""
# Import necessary libraries
import logging
import asyncio
import logging
# Import custom modules for configuration and database connection
from utils.config import loader_send_data as setting
from utils.database import WorkflowFlags
from utils.csv.loaders import get_next_csv_atomic
from utils.orchestrator_utils import run_orchestrator, worker_context
from utils.connect.send_data import process_workflow_record
from utils.csv.loaders import get_next_csv_atomic
from utils.database import WorkflowFlags
from utils.general import alterna_valori
from utils.orchestrator_utils import run_orchestrator, worker_context
# from utils.ftp.send_data import ftp_send_elab_csv_to_customer, api_send_elab_csv_to_customer, \
# ftp_send_raw_csv_to_customer, api_send_raw_csv_to_customer

View File

@@ -1,9 +1,10 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
def __init__(self):
c = ConfigParser()
@@ -22,4 +23,3 @@ class Config:
self.smtp_port = c.getint("smtp", "port")
self.smtp_user = c.get("smtp", "user")
self.smtp_passwd = c.get("smtp", "password")

View File

@@ -1,9 +1,10 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
def __init__(self):
"""
@@ -40,7 +41,6 @@ class Config:
self.dbname = c.get("db", "dbName")
self.max_retries = c.getint("db", "maxRetries")
# Tables
self.dbusertable = c.get("tables", "userTableName")
self.dbrectable = c.get("tables", "recTableName")
@@ -49,30 +49,24 @@ class Config:
self.dbnodes = c.get("tables", "nodesTableName")
# unit setting
self.units_name = [part for part in c.get("unit", "Names").split('|')]
self.units_type = [part for part in c.get("unit", "Types").split('|')]
self.units_alias = {
key: value
for item in c.get("unit", "Alias").split('|')
for key, value in [item.split(':', 1)]
}
#self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
self.units_name = [part for part in c.get("unit", "Names").split("|")]
self.units_type = [part for part in c.get("unit", "Types").split("|")]
self.units_alias = {key: value for item in c.get("unit", "Alias").split("|") for key, value in [item.split(":", 1)]}
# self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
# tool setting
self.tools_name = [part for part in c.get("tool", "Names").split('|')]
self.tools_type = [part for part in c.get("tool", "Types").split('|')]
self.tools_name = [part for part in c.get("tool", "Names").split("|")]
self.tools_type = [part for part in c.get("tool", "Types").split("|")]
self.tools_alias = {
key: key if value == '=' else value
for item in c.get("tool", "Alias").split('|')
for key, value in [item.split(':', 1)]
key: key if value == "=" else value for item in c.get("tool", "Alias").split("|") for key, value in [item.split(":", 1)]
}
# csv info
self.csv_infos = [part for part in c.get("csv", "Infos").split('|')]
self.csv_infos = [part for part in c.get("csv", "Infos").split("|")]
# TS pini path match
self.ts_pini_path_match = {
key: key[1:-1] if value == '=' else value
for item in c.get("ts_pini", "path_match").split('|')
for key, value in [item.split(':', 1)]
key: key[1:-1] if value == "=" else value
for item in c.get("ts_pini", "path_match").split("|")
for key, value in [item.split(":", 1)]
}

View File

@@ -1,9 +1,10 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
def __init__(self):
"""

View File

@@ -1,9 +1,10 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
def __init__(self):
"""
@@ -36,11 +37,11 @@ class Config:
self.dbnodes = c.get("tables", "nodesTableName")
# Tool
self.elab_status = [part for part in c.get("tool", "elab_status").split('|')]
self.elab_status = [part for part in c.get("tool", "elab_status").split("|")]
# Matlab
self.matlab_runtime = c.get("matlab", "runtime")
self.matlab_func_path = c.get("matlab", "func_path")
self.matlab_timeout = c.getint("matlab", "timeout")
self.matlab_error = c.get("matlab", "error")
self.matlab_error_path = c.get("matlab", "error_path")
self.matlab_error_path = c.get("matlab", "error_path")

View File

@@ -1,9 +1,10 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
def __init__(self):
"""

View File

@@ -1,15 +1,16 @@
"""set configurations
"""set configurations"""
"""
from configparser import ConfigParser
from . import ENV_PARENT_PATH
class Config:
"""
Handles configuration loading for database settings to load ftp users.
"""
def __init__(self):
def __init__(self):
c = ConfigParser()
c.read([f"{ENV_PARENT_PATH}/env/db.ini"])

View File

@@ -1,14 +1,16 @@
import os
from datetime import datetime
import logging
import os
import re
from datetime import datetime
import mysql.connector
from utils.database.connection import connetti_db
from utils.csv.parser import extract_value
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
def on_file_received(self: object, file: str) -> None:
"""
Processes a received file, extracts relevant information, and inserts it into the database.
@@ -22,7 +24,7 @@ def on_file_received(self: object, file: str) -> None:
if not os.stat(file).st_size:
os.remove(file)
logger.info(f'File {file} is empty: removed.')
logger.info(f"File {file} is empty: removed.")
else:
cfg = self.cfg
path, filenameExt = os.path.split(file)
@@ -30,8 +32,8 @@ def on_file_received(self: object, file: str) -> None:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_filename = f"{filename}_{timestamp}{fileExtension}"
os.rename(file, f"{path}/{new_filename}")
if (fileExtension.upper() in (cfg.fileext)):
with open(f"{path}/{new_filename}", 'r', encoding='utf-8', errors='ignore') as csvfile:
if fileExtension.upper() in (cfg.fileext):
with open(f"{path}/{new_filename}", encoding="utf-8", errors="ignore") as csvfile:
lines = csvfile.readlines()
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
@@ -42,50 +44,57 @@ def on_file_received(self: object, file: str) -> None:
# se esiste l'alias in alias_unit_type, allora prende il valore dell'alias... verifica sia lo unit_type completo che i primi 3 caratteri per CO_xxxxx
upper_unit_type = unit_type.upper()
unit_type = cfg.units_alias.get(upper_unit_type) or \
cfg.units_alias.get(upper_unit_type[:3]) or \
upper_unit_type
unit_type = cfg.units_alias.get(upper_unit_type) or cfg.units_alias.get(upper_unit_type[:3]) or upper_unit_type
upper_tool_type = tool_type.upper()
tool_type = cfg.tools_alias.get(upper_tool_type) or \
cfg.tools_alias.get(upper_tool_type[:3]) or \
upper_tool_type
tool_type = cfg.tools_alias.get(upper_tool_type) or cfg.tools_alias.get(upper_tool_type[:3]) or upper_tool_type
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
logger.error(f'{e}')
logger.error(f"{e}")
# Create a cursor
cur = conn.cursor()
# da estrarre in un modulo
if (unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK" ):
serial_number = filename.split('_')[0]
if unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK":
serial_number = filename.split("_")[0]
tool_info = f'{{"serial_number": {serial_number}}}'
try:
cur.execute(f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = '{serial_number}'")
unit_name, tool_name = cur.fetchone()
except Exception as e:
logger.warning(f'{tool_type} serial number {serial_number} not found in table vulink_tools. {e}')
logger.warning(f"{tool_type} serial number {serial_number} not found in table vulink_tools. {e}")
# da estrarre in un modulo
if (unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR" ):
if unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR":
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
stazione = extract_value(escaped_keys, filename)
if stazione:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
try:
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)", (self.username, new_filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
cur.execute(
f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (username, filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s,%s, %s, %s, %s, %s, %s, %s)",
(
self.username,
new_filename,
unit_name.upper(),
unit_type.upper(),
tool_name.upper(),
tool_type.upper(),
"".join(lines),
tool_info,
),
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f'File {new_filename} not loaded. Held in user path.')
logger.error(f'{e}')
logger.error(f"File {new_filename} not loaded. Held in user path.")
logger.error(f"{e}")
"""
else:
os.remove(file)
logger.info(f'File {new_filename} removed.')
"""
"""

View File

@@ -1,22 +1,23 @@
import logging
from datetime import datetime
from ftplib import FTP, FTP_TLS, all_errors
from io import BytesIO
import logging
import aiomysql
from datetime import datetime
from utils.database.loader_action import update_status, unlock
from utils.database.action_query import get_data_as_csv, get_tool_info, get_elab_timestamp
import aiomysql
from utils.database import WorkflowFlags
from utils.database.action_query import get_data_as_csv, get_elab_timestamp, get_tool_info
from utils.database.loader_action import unlock, update_status
logger = logging.getLogger(__name__)
class FTPConnection:
"""
Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection.
"""
def __init__(self, host, port=21, use_tls=False, user='', passwd='',
passive=True, timeout=None, debug=0, context=None):
def __init__(self, host, port=21, use_tls=False, user="", passwd="", passive=True, timeout=None, debug=0, context=None):
self.use_tls = use_tls
if use_tls:
@@ -44,10 +45,12 @@ class FTPConnection:
def __exit__(self, exc_type, exc_val, exc_tb):
self.ftp.quit()
async def ftp_send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, pool: object) -> bool:
None
return True
async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
"""
Sends elaborated CSV data to a customer via FTP.
@@ -81,26 +84,32 @@ async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str
try:
# Converti in bytes
csv_bytes = csv_data.encode('utf-8')
csv_bytes = csv_data.encode("utf-8")
csv_buffer = BytesIO(csv_bytes)
ftp_parms = await parse_ftp_parms(send_ftp_info["ftp_parm"])
use_tls = 'ssl_version' in ftp_parms
passive = ftp_parms.get('passive', True)
port = ftp_parms.get('port', 21)
use_tls = "ssl_version" in ftp_parms
passive = ftp_parms.get("passive", True)
port = ftp_parms.get("port", 21)
# Connessione FTP
with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp:
with FTPConnection(
host=send_ftp_info["ftp_addrs"],
port=port,
use_tls=use_tls,
user=send_ftp_info["ftp_user"],
passwd=send_ftp_info["ftp_passwd"],
passive=passive,
) as ftp:
# Cambia directory
if send_ftp_info["ftp_target"] != "/":
ftp.cwd(send_ftp_info["ftp_target"])
# Invia il file
result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer)
result = ftp.storbinary(f"STOR {send_ftp_info['ftp_filename']}", csv_buffer)
if result.startswith('226'):
logger.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo")
if result.startswith("226"):
logger.info(f"File {send_ftp_info['ftp_filename']} inviato con successo")
return True
else:
logger.error(f"Errore nell'invio: {result}")
@@ -115,6 +124,7 @@ async def ftp_send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str
finally:
csv_buffer.close()
async def parse_ftp_parms(ftp_parms: str) -> dict:
"""
Parses a string of FTP parameters into a dictionary.
@@ -127,19 +137,19 @@ async def parse_ftp_parms(ftp_parms: str) -> dict:
dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values.
"""
# Rimuovere spazi e dividere per virgola
pairs = ftp_parms.split(',')
pairs = ftp_parms.split(",")
result = {}
for pair in pairs:
if '=>' in pair:
key, value = pair.split('=>', 1)
if "=>" in pair:
key, value = pair.split("=>", 1)
key = key.strip().lower()
value = value.strip().lower()
# Convertire i valori appropriati
if value.isdigit():
value = int(value)
elif value == '':
elif value == "":
value = None
result[key] = value
@@ -158,10 +168,7 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj
pool: Pool di connessioni al database
"""
# Estrazione e normalizzazione dei dati del record
id, unit_type, tool_type, unit_name, tool_name = [
x.lower().replace(" ", "_") if isinstance(x, str) else x
for x in record
]
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
try:
# Recupero informazioni principali
@@ -171,15 +178,15 @@ async def process_workflow_record(record: tuple, fase: int, cfg: dict, pool: obj
# Verifica se il processing può essere eseguito
if not _should_process(tool_elab_info, timestamp_matlab_elab):
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: "
"invio dati non eseguito - due date raggiunta.")
logger.info(
f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: invio dati non eseguito - due date raggiunta."
)
await update_status(cfg, id, fase, pool)
return
# Routing basato sulla fase
success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
success = await _route_by_phase(fase, tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if success:
await update_status(cfg, id, fase, pool)
@@ -207,7 +214,7 @@ def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bo
duedate = tool_elab_info.get("duedate")
# Se non c'è duedate o è vuota/nulla, può essere processato
if not duedate or duedate in ('0000-00-00 00:00:00', ''):
if not duedate or duedate in ("0000-00-00 00:00:00", ""):
return True
# Se timestamp_matlab_elab è None/null, usa il timestamp corrente
@@ -215,18 +222,18 @@ def _should_process(tool_elab_info: dict, timestamp_matlab_elab: datetime) -> bo
# Converti duedate in datetime se è una stringa
if isinstance(duedate, str):
duedate = datetime.strptime(duedate, '%Y-%m-%d %H:%M:%S')
duedate = datetime.strptime(duedate, "%Y-%m-%d %H:%M:%S")
# Assicurati che comparison_timestamp sia datetime
if isinstance(comparison_timestamp, str):
comparison_timestamp = datetime.strptime(comparison_timestamp, '%Y-%m-%d %H:%M:%S')
comparison_timestamp = datetime.strptime(comparison_timestamp, "%Y-%m-%d %H:%M:%S")
return duedate > comparison_timestamp
async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str,
timestamp_matlab_elab: datetime, pool: object) -> bool:
async def _route_by_phase(
fase: int, tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object
) -> bool:
"""
Routes the processing of a workflow record based on the current phase.
@@ -247,20 +254,19 @@ async def _route_by_phase(fase: int, tool_elab_info: dict, cfg: dict, id: int, u
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
if fase == WorkflowFlags.SENT_ELAB_DATA:
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name,
tool_name, timestamp_matlab_elab, pool)
return await _handle_elab_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
elif fase == WorkflowFlags.SENT_RAW_DATA:
return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name,
tool_name, pool)
return await _handle_raw_data_phase(tool_elab_info, cfg, id, unit_name, tool_name, pool)
else:
logger.info(f"id {id} - {unit_name} - {tool_name}: nessuna azione da eseguire.")
return True
async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str,
timestamp_matlab_elab: datetime, pool: object) -> bool:
async def _handle_elab_data_phase(
tool_elab_info: dict, cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object
) -> bool:
"""
Handles the phase of sending elaborated data.
@@ -281,14 +287,12 @@ async def _handle_elab_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati elaborati
if tool_elab_info.get('ftp_send'):
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
if tool_elab_info.get("ftp_send"):
return await _send_elab_data_ftp(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
# API send per dati elaborati
elif _should_send_elab_api(tool_elab_info):
return await _send_elab_data_api(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
return await _send_elab_data_api(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
return True
@@ -313,9 +317,8 @@ async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_
bool: True if the data sending operation was successful or no action was needed, False otherwise.
"""
# FTP send per dati raw
if tool_elab_info.get('ftp_send_raw'):
if tool_elab_info.get("ftp_send_raw"):
return await _send_raw_data_ftp(cfg, id, unit_name, tool_name, pool)
# API send per dati raw
@@ -327,16 +330,16 @@ async def _handle_raw_data_phase(tool_elab_info: dict, cfg: dict, id: int, unit_
def _should_send_elab_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati elaborati devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api') and
tool_elab_info.get('api_send') and
tool_elab_info.get('inoltro_api_url', '').strip())
return tool_elab_info.get("inoltro_api") and tool_elab_info.get("api_send") and tool_elab_info.get("inoltro_api_url", "").strip()
def _should_send_raw_api(tool_elab_info: dict) -> bool:
"""Verifica se i dati raw devono essere inviati via API."""
return (tool_elab_info.get('inoltro_api_raw') and
tool_elab_info.get('api_send_raw') and
tool_elab_info.get('inoltro_api_url_raw', '').strip())
return (
tool_elab_info.get("inoltro_api_raw")
and tool_elab_info.get("api_send_raw")
and tool_elab_info.get("inoltro_api_url_raw", "").strip()
)
async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str, timestamp_matlab_elab: datetime, pool: object) -> bool:
@@ -358,8 +361,7 @@ async def _send_elab_data_ftp(cfg: dict, id: int, unit_name: str, tool_name: str
bool: True if the FTP sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if not elab_csv:
return False
@@ -395,8 +397,7 @@ async def _send_elab_data_api(cfg: dict, id: int, unit_name: str, tool_name: str
bool: True if the API sending was successful, False otherwise.
"""
try:
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name,
timestamp_matlab_elab, pool)
elab_csv = await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool)
if not elab_csv:
return False
@@ -470,4 +471,4 @@ async def _send_raw_data_api(cfg: dict, id: int, unit_name: str, tool_name: str,
except Exception as e:
logger.error(f"Errore invio API raw data id {id}: {e}")
return False
return False

View File

@@ -1,11 +1,13 @@
import smtplib
import logging
import smtplib
from email.message import EmailMessage
from utils.config import loader_email as setting
cfg = setting.Config()
logger = logging.getLogger(__name__)
async def send_error_email(unit_name: str, tool_name: str, matlab_cmd: str, matlab_error: str, errors: list, warnings: list) -> None:
"""
Sends an error email containing details about a MATLAB processing failure.
@@ -24,24 +26,33 @@ async def send_error_email(unit_name: str, tool_name: str, matlab_cmd: str, matl
# Creazione dell'oggetto messaggio
msg = EmailMessage()
msg['Subject'] = cfg.subject
msg['From'] = cfg.from_addr
msg['To'] = cfg.to_addr
msg['Cc'] = cfg.cc_addr
msg['Bcc'] = cfg.bcc_addr
msg["Subject"] = cfg.subject
msg["From"] = cfg.from_addr
msg["To"] = cfg.to_addr
msg["Cc"] = cfg.cc_addr
msg["Bcc"] = cfg.bcc_addr
MatlabErrors = "<br/>".join(errors)
MatlabWarnings = "<br/>".join(dict.fromkeys(warnings))
# Imposta il contenuto del messaggio come HTML
msg.add_alternative(cfg.body.format(unit=unit_name, tool=tool_name, matlab_cmd=matlab_cmd, matlab_error=matlab_error,
MatlabErrors=MatlabErrors, MatlabWarnings=MatlabWarnings), subtype='html')
msg.add_alternative(
cfg.body.format(
unit=unit_name,
tool=tool_name,
matlab_cmd=matlab_cmd,
matlab_error=matlab_error,
MatlabErrors=MatlabErrors,
MatlabWarnings=MatlabWarnings,
),
subtype="html",
)
try:
# Connessione al server SMTP
with smtplib.SMTP(cfg.smtp_addr, cfg.smtp_port) as server:
server.starttls() # Avvia la crittografia TLS per una connessione sicura
server.login(cfg.smtp_user, cfg.smtp_passwd) # Autenticazione con il server
server.login(cfg.smtp_user, cfg.smtp_passwd) # Autenticazione con il server
server.send_message(msg) # Invio dell'email
logger.info("Email inviata con successo!")
except Exception as e:
logger.error(f"Errore durante l'invio dell'email: {e}")
logger.error(f"Errore durante l'invio dell'email: {e}")

View File

@@ -1,14 +1,15 @@
import os
import mysql.connector
import logging
import os
from hashlib import sha256
from pathlib import Path
import mysql.connector
from utils.database.connection import connetti_db
logger = logging.getLogger(__name__)
def ftp_SITE_ADDU(self: object, line: str) -> None:
"""
Adds a virtual user, creates their directory, and saves their details to the database.
@@ -22,38 +23,40 @@ def ftp_SITE_ADDU(self: object, line: str) -> None:
user = os.path.basename(parms[0]) # Extract the username
password = parms[1] # Get the password
hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password
except IndexError:
self.respond('501 SITE ADDU failed. Command needs 2 arguments')
except IndexError:
self.respond("501 SITE ADDU failed. Command needs 2 arguments")
else:
try:
# Create the user's directory
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.respond(f'551 Error in create virtual user path: {e}')
self.respond(f"551 Error in create virtual user path: {e}")
else:
try:
# Add the user to the authorizer
self.authorizer.add_user(str(user),
hash, cfg.virtpath + "/" + user, perm=cfg.defperm)
self.authorizer.add_user(str(user), hash, cfg.virtpath + "/" + user, perm=cfg.defperm)
# Save the user to the database
# Define the database connection
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
logger.error(f"{e}")
# Create a cursor
cur = conn.cursor()
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')")
cur.execute(
f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')"
)
conn.commit()
conn.close()
logger.info(f"User {user} created.")
self.respond('200 SITE ADDU successful.')
self.respond("200 SITE ADDU successful.")
except Exception as e:
self.respond(f'501 SITE ADDU failed: {e}.')
self.respond(f"501 SITE ADDU failed: {e}.")
print(e)
def ftp_SITE_DISU(self: object, line: str) -> None:
"""
Removes a virtual user from the authorizer and marks them as deleted in the database.
@@ -72,7 +75,7 @@ def ftp_SITE_DISU(self: object, line: str) -> None:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
logger.error(f"{e}")
# Crea un cursore
cur = conn.cursor()
@@ -81,11 +84,12 @@ def ftp_SITE_DISU(self: object, line: str) -> None:
conn.close()
logger.info(f"User {user} deleted.")
self.respond('200 SITE DISU successful.')
self.respond("200 SITE DISU successful.")
except Exception as e:
self.respond('501 SITE DISU failed.')
self.respond("501 SITE DISU failed.")
print(e)
def ftp_SITE_ENAU(self: object, line: str) -> None:
"""
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
@@ -102,7 +106,7 @@ def ftp_SITE_ENAU(self: object, line: str) -> None:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
logger.error(f"{e}")
# Crea un cursore
cur = conn.cursor()
@@ -118,18 +122,19 @@ def ftp_SITE_ENAU(self: object, line: str) -> None:
self.authorizer.add_user(ftpuser, hash, virtpath, perm)
try:
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
except Exception as e:
self.responde(f'551 Error in create virtual user path: {e}')
except Exception as e:
self.responde(f"551 Error in create virtual user path: {e}")
conn.close()
logger.info(f"User {user} restored.")
self.respond('200 SITE ENAU successful.')
self.respond("200 SITE ENAU successful.")
except Exception as e:
self.respond('501 SITE ENAU failed.')
self.respond("501 SITE ENAU failed.")
print(e)
def ftp_SITE_LSTU(self: object, line: str) -> None:
"""
Lists all virtual users from the database.
@@ -145,15 +150,18 @@ def ftp_SITE_LSTU(self: object, line: str) -> None:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
logger.error(f"{e}")
# Crea un cursore
cur = conn.cursor()
self.push("214-The following virtual users are defined:\r\n")
cur.execute(f'SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}')
[users_list.append(f'Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n') for ftpuser, perm, disabled_at in cur.fetchall()]
self.push(''.join(users_list))
cur.execute(f"SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}")
[
users_list.append(f"Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n")
for ftpuser, perm, disabled_at in cur.fetchall()
]
self.push("".join(users_list))
self.respond("214 LSTU SITE command successful.")
except Exception as e:
self.respond(f'501 list users failed: {e}')
except Exception as e:
self.respond(f"501 list users failed: {e}")

View File

@@ -1,15 +1,16 @@
#!.venv/bin/python
from utils.database.nodes_query import get_nodes_type
from utils.timestamp.date_check import normalizza_data, normalizza_orario
from utils.database.loader_action import find_nearest_timestamp
import logging
import re
from itertools import islice
from datetime import datetime, timedelta
from itertools import islice
from utils.database.loader_action import find_nearest_timestamp
from utils.database.nodes_query import get_nodes_type
from utils.timestamp.date_check import normalizza_data, normalizza_orario
logger = logging.getLogger(__name__)
async def get_data(cfg: object, id: int, pool: object) -> tuple:
"""
Retrieves unit name, tool name, and tool data for a given record ID from the database.
@@ -23,11 +24,12 @@ async def get_data(cfg: object, id: int, pool: object) -> tuple:
"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(f'select filename, unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
await cur.execute(f"select filename, unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}")
filename, unit_name, tool_name, tool_data = await cur.fetchone()
return filename, unit_name, tool_name, tool_data
async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes pipe-separated data from a CSV record into a structured matrix.
@@ -49,24 +51,35 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola
che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina
"""
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
EventDate, EventTime = timestamp.split(' ')
if batlevel == '|':
for riga in [
riga
for riga in righe
if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable()
]:
timestamp, batlevel, temperature, rilevazioni = riga.split(";", 3)
EventDate, EventTime = timestamp.split(" ")
if batlevel == "|":
batlevel = temperature
temperature, rilevazioni = rilevazioni.split(';',1)
''' in alcune letture mancano temperatura e livello batteria'''
if temperature == '':
temperature, rilevazioni = rilevazioni.split(";", 1)
""" in alcune letture mancano temperatura e livello batteria"""
if temperature == "":
temperature = 0
if batlevel == '':
if batlevel == "":
batlevel = 0
valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
valori_nodi = (
rilevazioni.lstrip("|;").rstrip(";").split(";|;")
) # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
for num_nodo, valori_nodo in enumerate(valori_nodi, start=1):
valori = valori_nodo.split(';')
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
valori = valori_nodo.split(";")
matrice_valori.append(
[UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ valori
+ ([None] * (19 - len(valori)))
)
return matrice_valori
async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes analog and digital input data from a CSV record into a structured matrix.
@@ -82,25 +95,34 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$'
pattern = r"^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$"
if node_ains or node_dins:
for riga in [riga for riga in righe if re.match(pattern, riga)]:
timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';')
EventDate, EventTime = timestamp.split(' ')
timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(";")
EventDate, EventTime = timestamp.split(" ")
if any(node_ains):
for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1):
matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1)))
matrice_valori.append(
[UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ [analog_act]
+ ([None] * (19 - 1))
)
else:
logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}")
if any(node_dins):
start_node = 3 if any(node_ains) else 1
for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node):
matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1)))
matrice_valori.append(
[UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ [digital_act]
+ ([None] * (19 - 1))
)
else:
logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}")
return matrice_valori
async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes channel-based data from a CSV record into a structured matrix.
@@ -116,19 +138,28 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3)
EventDate, EventTime = timestamp.split(' ')
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
for riga in [
riga
for riga in righe
if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable()
]:
timestamp, batlevel, temperature, rilevazioni = riga.replace(";|;", ";").split(";", 3)
EventDate, EventTime = timestamp.split(" ")
valori_splitted = [valore for valore in rilevazioni.split(";") if valore != "|"]
valori_iter = iter(valori_splitted)
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
matrice_valori.append(
[UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ valori
+ ([None] * (19 - len(valori)))
)
return matrice_valori
async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes 'Musa' specific data from a CSV record into a structured matrix.
@@ -144,20 +175,28 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2)
if timestamp == '':
for riga in [
riga
for riga in righe
if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable()
]:
timestamp, batlevel, rilevazioni = riga.replace(";|;", ";").split(";", 2)
if timestamp == "":
continue
EventDate, EventTime = timestamp.split(' ')
temperature = rilevazioni.split(';')[0]
logger.info(f'{temperature}, {rilevazioni}')
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
EventDate, EventTime = timestamp.split(" ")
temperature = rilevazioni.split(";")[0]
logger.info(f"{temperature}, {rilevazioni}")
valori_splitted = [valore for valore in rilevazioni.split(";") if valore != "|"]
valori_iter = iter(valori_splitted)
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
matrice_valori.append(
[UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ valori
+ ([None] * (19 - len(valori)))
)
return matrice_valori
@@ -178,17 +217,20 @@ async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
valori_x_nodo = 2
matrice_valori = []
for riga in righe:
timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(';',4)
EventDate, EventTime = timestamp.split(' ')
lista_rilevazioni = rilevazioni.strip(';').split(';')
timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(";", 4)
EventDate, EventTime = timestamp.split(" ")
lista_rilevazioni = rilevazioni.strip(";").split(";")
lista_rilevazioni.append(barometer)
valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)]
valori_nodi = [lista_rilevazioni[i : i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)]
for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
matrice_valori.append(
[UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature]
+ valori
+ ([None] * (19 - len(valori)))
)
return matrice_valori
async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
"""
Processes 'GD' specific data from a CSV record into a structured matrix.
@@ -203,34 +245,64 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines()
matrice_valori = []
pattern = r';-?\d+dB$'
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
timestamp, rilevazioni = riga.split(';|;',1)
EventDate, EventTime = timestamp.split(' ')
#logger.debug(f"GD id {id}: {pattern} {rilevazioni}")
pattern = r";-?\d+dB$"
for riga in [
riga
for riga in righe
if ";|;" in riga and "No RX" not in riga and ".-" not in riga and "File Creation" not in riga and riga.isprintable()
]:
timestamp, rilevazioni = riga.split(";|;", 1)
EventDate, EventTime = timestamp.split(" ")
# logger.debug(f"GD id {id}: {pattern} {rilevazioni}")
if re.search(pattern, rilevazioni):
if len(matrice_valori) == 0:
matrice_valori.append(['RSSI'])
batlevel, temperature, rssi = rilevazioni.split(';')
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}")
matrice_valori.append(["RSSI"])
batlevel, temperature, rssi = rilevazioni.split(";")
# logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}")
gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S")
start_timestamp = gd_timestamp - timedelta(seconds=45)
end_timestamp = gd_timestamp + timedelta(seconds=45)
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])])
matrice_valori.append(
[
UnitName,
ToolNameID.replace("GD", "DT"),
1,
f"{start_timestamp:%Y-%m-%d %H:%M:%S}",
f"{end_timestamp:%Y-%m-%d %H:%M:%S}",
f"{gd_timestamp:%Y-%m-%d %H:%M:%S}",
batlevel,
temperature,
int(rssi[:-2]),
]
)
elif all(char == ';' for char in rilevazioni):
elif all(char == ";" for char in rilevazioni):
pass
elif ';|;' in rilevazioni:
unit_metrics, data = rilevazioni.split(';|;')
batlevel, temperature = unit_metrics.split(';')
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}")
elif ";|;" in rilevazioni:
unit_metrics, data = rilevazioni.split(";|;")
batlevel, temperature = unit_metrics.split(";")
# logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}")
dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp(cfg, {"timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "unit": UnitName, "tool": ToolNameID.replace("GD", "DT"), "node_num": 1}, pool)
EventDate, EventTime = dt_timestamp.strftime('%Y-%m-%d %H:%M:%S').split(' ')
valori = data.split(';')
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)] + valori + ([None] * (16 - len(valori))) + [batlevel, temperature, None])
dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp(
cfg,
{
"timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}",
"unit": UnitName,
"tool": ToolNameID.replace("GD", "DT"),
"node_num": 1,
},
pool,
)
EventDate, EventTime = dt_timestamp.strftime("%Y-%m-%d %H:%M:%S").split(" ")
valori = data.split(";")
matrice_valori.append(
[UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)]
+ valori
+ ([None] * (16 - len(valori)))
+ [batlevel, temperature, None]
)
else:
logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}")
return matrice_valori
return matrice_valori

View File

@@ -1,16 +1,23 @@
import asyncio
import tempfile
import os
from utils.database.loader_action import load_data, update_status, unlock
from utils.database import WorkflowFlags
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix, get_data
import logging
import os
import tempfile
from utils.csv.data_preparation import (
get_data,
make_ain_din_matrix,
make_channels_matrix,
make_gd_matrix,
make_musa_matrix,
make_pipe_sep_matrix,
make_tlp_matrix,
)
from utils.database import WorkflowFlags
from utils.database.loader_action import load_data, unlock, update_status
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
"""
Main loader function to process CSV data based on the specified action.
@@ -27,7 +34,7 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
"channels": make_channels_matrix,
"tlp": make_tlp_matrix,
"gd": make_gd_matrix,
"musa": make_musa_matrix
"musa": make_musa_matrix,
}
if action in type_matrix_mapping:
function_to_call = type_matrix_mapping[action]
@@ -69,7 +76,8 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s
async with conn.cursor() as cur:
# Usa SELECT FOR UPDATE per lock atomico
await cur.execute(f"""
await cur.execute(
f"""
SELECT id, unit_type, tool_type, unit_name, tool_name
FROM {table_name}
WHERE locked = 0
@@ -78,15 +86,20 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED
""", (status, status, next_status))
""",
(status, status, next_status),
)
result = await cur.fetchone()
if result:
await cur.execute(f"""
await cur.execute(
f"""
UPDATE {table_name}
SET locked = 1
WHERE id = %s
""", (result[0],))
""",
(result[0],),
)
# Commit esplicito per rilasciare il lock
await conn.commit()
@@ -97,6 +110,7 @@ async def get_next_csv_atomic(pool: object, table_name: str, status: int, next_s
await conn.rollback()
raise e
async def main_old_script_loader(cfg: object, id: int, pool: object, script_name: str) -> None:
"""
This function retrieves CSV data, writes it to a temporary file,
@@ -110,21 +124,19 @@ async def main_old_script_loader(cfg: object, id: int, pool: object, script_name
"""
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
with tempfile.NamedTemporaryFile(mode="w", prefix=filename, suffix=".csv", delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', f'old_scripts/{script_name}.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
"python3", f"old_scripts/{script_name}.py", temp_filename, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
result_stdout = stdout.decode("utf-8")
result_stderr = stderr.decode("utf-8")
finally:
# Pulire il file temporaneo
@@ -138,4 +150,4 @@ async def main_old_script_loader(cfg: object, id: int, pool: object, script_name
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)
await unlock(cfg, id, pool)

View File

@@ -1,6 +1,7 @@
import re
def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str='Not Defined') -> str:
def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default: str = "Not Defined") -> str:
"""
Extracts a value from a given source (or sources) based on a list of regex patterns.
@@ -12,7 +13,8 @@ def extract_value(patterns: list, primary_source: str, secondary_source: str = N
Args:
patterns (list): A list of regular expression strings to search for.
primary_source (str): The main string to search within.
secondary_source (str, optional): An additional string to search within if no match is found in the primary source. Defaults to None.
secondary_source (str, optional): An additional string to search within if no match is found in the primary source.
Defaults to None.
default (str, optional): The value to return if no match is found. Defaults to 'Not Defined'.
Returns:

View File

@@ -4,24 +4,25 @@ class WorkflowFlags:
Each flag is a power of 2, allowing them to be combined using bitwise operations
to represent multiple states simultaneously.
"""
CSV_RECEIVED = 0 # 0000
DATA_LOADED = 1 # 0001
START_ELAB = 2 # 0010
DATA_ELABORATED = 4 # 0100
SENT_RAW_DATA = 8 # 1000
SENT_ELAB_DATA = 16 # 10000
DUMMY_ELABORATED = 32 # 100000 (Used for testing or specific dummy elaborations)
CSV_RECEIVED = 0 # 0000
DATA_LOADED = 1 # 0001
START_ELAB = 2 # 0010
DATA_ELABORATED = 4 # 0100
SENT_RAW_DATA = 8 # 1000
SENT_ELAB_DATA = 16 # 10000
DUMMY_ELABORATED = 32 # 100000 (Used for testing or specific dummy elaborations)
# Mappatura flag -> colonna timestamp
FLAG_TO_TIMESTAMP = {
WorkflowFlags.CSV_RECEIVED: "inserted_at",
WorkflowFlags.DATA_LOADED: "loaded_at",
WorkflowFlags.START_ELAB: "start_elab_at",
WorkflowFlags.DATA_ELABORATED: "elaborated_at",
WorkflowFlags.SENT_RAW_DATA: "sent_raw_at",
WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at",
WorkflowFlags.DUMMY_ELABORATED: "elaborated_at" # Shares the same timestamp column as DATA_ELABORATED
WorkflowFlags.DUMMY_ELABORATED: "elaborated_at", # Shares the same timestamp column as DATA_ELABORATED
}
"""
A dictionary mapping each WorkflowFlag to the corresponding database column
@@ -33,4 +34,4 @@ BATCH_SIZE = 1000
"""
The number of records to process in a single batch when loading data into the database.
This helps manage memory usage and improve performance for large datasets.
"""
"""

View File

@@ -1,18 +1,18 @@
import logging
import aiomysql
import csv
import logging
from io import StringIO
import aiomysql
from utils.database import WorkflowFlags
logger = logging.getLogger(__name__)
sub_select = {
WorkflowFlags.DATA_ELABORATED:
"""m.matcall, s.`desc` AS statustools""",
WorkflowFlags.SENT_RAW_DATA:
"""t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""",
WorkflowFlags.SENT_ELAB_DATA:
"""t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw,
WorkflowFlags.DATA_ELABORATED: """m.matcall, s.`desc` AS statustools""",
WorkflowFlags.SENT_RAW_DATA: """t.ftp_send, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token,
s.`desc` AS statustools, IFNULL(u.duedate, "") AS duedate""",
WorkflowFlags.SENT_ELAB_DATA: """t.ftp_send_raw, IFNULL(u.ftp_mode_raw, "") AS ftp_mode_raw,
IFNULL(u.ftp_addrs_raw, "") AS ftp_addrs_raw, IFNULL(u.ftp_user_raw, "") AS ftp_user_raw,
IFNULL(u.ftp_passwd_raw, "") AS ftp_passwd_raw, IFNULL(u.ftp_filename_raw, "") AS ftp_filename_raw,
IFNULL(u.ftp_parm_raw, "") AS ftp_parm_raw, IFNULL(u.ftp_target_raw, "") AS ftp_target_raw,
@@ -20,8 +20,9 @@ sub_select = {
IFNULL(u.inoltro_api_url_raw, "") AS inoltro_api_url_raw,
IFNULL(u.inoltro_api_bearer_token_raw, "") AS inoltro_api_bearer_token_raw,
t.api_send_raw, IFNULL(u.duedate, "") AS duedate
"""
}
""",
}
async def get_tool_info(next_status: int, unit: str, tool: str, pool: object) -> tuple:
"""
@@ -89,7 +90,8 @@ async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_
select * from (
select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth',
'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local',
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level',
'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
union all
select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth,
XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local,
@@ -133,7 +135,8 @@ async def get_elab_timestamp(id_recv: int, pool: object) -> float:
except Exception as e:
logger.error(f"id {id_recv} - Errore nella query timestamp elaborazione: {e}")
return None
async def check_flag_elab(pool: object) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:

View File

@@ -1,9 +1,11 @@
import logging
import mysql.connector
from mysql.connector import Error
logger = logging.getLogger(__name__)
def connetti_db(cfg: object) -> object:
"""
Establishes a connection to a MySQL database.
@@ -21,14 +23,10 @@ def connetti_db(cfg: object) -> object:
A MySQL connection object if the connection is successful, otherwise None.
"""
try:
conn = mysql.connector.connect(user=cfg.dbuser,
password=cfg.dbpass,
host=cfg.dbhost,
port=cfg.dbport,
database=cfg.dbname)
conn = mysql.connector.connect(user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport, database=cfg.dbname)
conn.autocommit = True
logger.info("Connected")
return conn
except Error as e:
logger.error(f"Database connection error: {e}")
raise # Re-raise the exception to be handled by the caller
raise # Re-raise the exception to be handled by the caller

View File

@@ -1,10 +1,10 @@
#!.venv/bin/python
import logging
import asyncio
from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE
import logging
from datetime import datetime, timedelta
from utils.database import BATCH_SIZE, FLAG_TO_TIMESTAMP
logger = logging.getLogger(__name__)
@@ -75,13 +75,15 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str)
`ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`),
`ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`),
`ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`),
`BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`),
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
`BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule,
{cfg.dbrawdata}.`BatLevelModule`),
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule,
{cfg.dbrawdata}.`TemperatureModule`),
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
`Created_at` = NOW()
"""
#logger.info(f"Query insert: {sql_load_RAWDATA}.")
#logger.info(f"Matrice valori da inserire: {matrice_valori}.")
# logger.info(f"Query insert: {sql_load_RAWDATA}.")
# logger.info(f"Matrice valori da inserire: {matrice_valori}.")
rc = False
async with pool.acquire() as conn:
async with conn.cursor() as cur:
@@ -90,12 +92,12 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str)
logger.info(f"Loading data attempt {attempt + 1}.")
for i in range(0, len(matrice_valori), BATCH_SIZE):
batch = matrice_valori[i:i + BATCH_SIZE]
batch = matrice_valori[i : i + BATCH_SIZE]
await cur.executemany(sql_load_RAWDATA, batch)
await conn.commit()
logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}")
logger.info(f"Completed batch {i // BATCH_SIZE + 1}/{(len(matrice_valori) - 1) // BATCH_SIZE + 1}")
logger.info("Data loaded.")
rc = True
@@ -106,9 +108,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object, type: str)
# logger.error(f"Matrice valori da inserire: {batch}.")
if e.args[0] == 1213: # Deadlock detected
logger.warning(
f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}"
)
logger.warning(f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}")
if attempt < cfg.max_retries - 1:
delay = 2 * attempt
@@ -159,9 +159,7 @@ async def unlock(cfg: object, id: int, pool: object) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(
f"update {cfg.dbrectable} set locked = 0 where id = {id}"
)
await cur.execute(f"update {cfg.dbrectable} set locked = 0 where id = {id}")
await conn.commit()
logger.info(f"id {id} unlocked.")
except Exception as e:
@@ -184,7 +182,8 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tup
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate
await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api,
u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate
from matfuncs as m
inner join tools as t on t.matfunc = m.id
inner join units as u on u.id = t.unit_id
@@ -194,6 +193,7 @@ async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tup
except Exception as e:
logger.error(f"Error: {e}")
async def find_nearest_timestamp(cfg: object, unit_tool_data: dict, pool: object) -> tuple:
"""
Finds the nearest timestamp in the raw data table based on a reference timestamp
@@ -222,11 +222,12 @@ async def find_nearest_timestamp(cfg: object, unit_tool_data: dict, pool: object
try:
await cur.execute(f'''SELECT TIMESTAMP(`EventDate`, `EventTime`) AS event_timestamp, BatLevel, Temperature
FROM {cfg.dbrawdata}
WHERE UnitName = "{unit_tool_data["unit"]}" AND ToolNameID = "{unit_tool_data["tool"]}" AND NodeNum = {unit_tool_data["node_num"]}
WHERE UnitName = "{unit_tool_data["unit"]}" AND ToolNameID = "{unit_tool_data["tool"]}"
AND NodeNum = {unit_tool_data["node_num"]}
AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN "{start_timestamp}" AND "{end_timestamp}"
ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), "{ref_timestamp}"))
LIMIT 1
''')
return await cur.fetchone()
except Exception as e:
logger.error(f"Error: {e}")
logger.error(f"Error: {e}")

View File

@@ -1,9 +1,10 @@
import aiomysql
import logging
import aiomysql
logger = logging.getLogger(__name__)
async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tuple:
"""Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità.
@@ -39,8 +40,8 @@ async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tup
else:
channels, types, ains, dins = [], [], [], []
for row in results:
channels.append(row['channels'])
types.append(row['type'])
ains.append(row['ain'])
dins.append(row['din'])
channels.append(row["channels"])
types.append(row["type"])
ains.append(row["ain"])
dins.append(row["din"])
return channels, types, ains, dins

View File

@@ -1,11 +1,11 @@
import glob
import os
from itertools import cycle, chain
import logging
import os
from itertools import chain, cycle
logger = logging.getLogger()
def alterna_valori(*valori: any, ping_pong: bool = False) -> any:
"""
Genera una sequenza ciclica di valori, con opzione per una sequenza "ping-pong".
@@ -64,13 +64,13 @@ async def read_error_lines_from_logs(base_path: str, pattern: str) -> tuple[list
for file_path in matching_files:
try:
with open(file_path, 'r', encoding='utf-8') as file:
with open(file_path, encoding="utf-8") as file:
lines = file.readlines()
# Usando dict.fromkeys() per mantenere l'ordine e togliere le righe duplicate per i warnings
non_empty_lines = [line.strip() for line in lines if line.strip()]
errors = [line for line in non_empty_lines if line.startswith('Error')]
warnings = list(dict.fromkeys(line for line in non_empty_lines if not line.startswith('Error')))
errors = [line for line in non_empty_lines if line.startswith("Error")]
warnings = list(dict.fromkeys(line for line in non_empty_lines if not line.startswith("Error")))
except Exception as e:
logger.error(f"Errore durante la lettura del file {file_path}: {e}")

View File

@@ -1,9 +1,11 @@
import logging
import asyncio
import os
import aiomysql
import contextvars
from typing import Callable, Coroutine, Any
import logging
import os
from collections.abc import Callable, Coroutine
from typing import Any
import aiomysql
# Crea una context variable per identificare il worker
worker_context = contextvars.ContextVar("worker_id", default="^-^")
@@ -35,9 +37,7 @@ def setup_logging(log_filename: str, log_level_str: str):
"""
logger = logging.getLogger()
handler = logging.FileHandler(log_filename)
formatter = WorkerFormatter(
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
)
formatter = WorkerFormatter("%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s")
handler.setFormatter(formatter)
# Rimuovi eventuali handler esistenti e aggiungi il nostro
@@ -83,11 +83,7 @@ async def run_orchestrator(
pool_recycle=3600,
)
tasks = [
asyncio.create_task(worker_coro(i, cfg, pool))
for i in range(cfg.max_threads)
]
tasks = [asyncio.create_task(worker_coro(i, cfg, pool)) for i in range(cfg.max_threads)]
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
@@ -101,4 +97,4 @@ async def run_orchestrator(
logger.info("Info: Shutdown richiesto... chiusura in corso")
except Exception as e:
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
logger.error(f"Errore principale: {e}", exc_info=debug_mode)

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'cr1000x_cr1000x'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'd2w_d2w'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g201_g201'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool,"channels")
await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g301_g301'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_iptm'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_loc'.

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_mums'.

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as musa_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_musa'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await musa_main_loader(cfg, id, pool, "musa")
await musa_main_loader(cfg, id, pool, "musa")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g801_mux'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool, "channels")
await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_dsas'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as gd_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_gd'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await gd_main_loader(cfg, id, pool, "gd")
await gd_main_loader(cfg, id, pool, "gd")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_loc'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_modb'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_mums'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as channels_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'g802_mux'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await channels_main_loader(cfg, id, pool, "channels")
await channels_main_loader(cfg, id, pool, "channels")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as tlp_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'gs1_gs1'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await tlp_main_loader(cfg, id, pool, "tlp")
await tlp_main_loader(cfg, id, pool, "tlp")

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as hirpinia_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'hirpinia_hirpinia'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_loader as pipe_sep_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'hortus_hortus'.
@@ -13,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as vulink_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'isi_csv_log_vulink'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_health'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as sisgeo_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'sisgeo_readings'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as sorotecPini_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'sorotecpini_co'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_integrity_monitor'.

View File

@@ -1,7 +1,7 @@
from utils.csv.loaders import main_old_script_loader as ts_pini_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'stazionetotale_messpunktepini'.

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as analog_dig_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'tlp_loc'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")

View File

@@ -1,5 +1,6 @@
from utils.csv.loaders import main_loader as tlp_main_loader
async def main_loader(cfg: object, id: int, pool: object) -> None:
"""
Carica ed elabora i dati CSV specifici per il tipo 'tlp_tlp'.
@@ -12,4 +13,4 @@ async def main_loader(cfg: object, id: int, pool: object) -> None:
id (int): L'ID del record CSV da elaborare.
pool (object): Il pool di connessioni al database.
"""
await tlp_main_loader(cfg, id, pool, "tlp")
await tlp_main_loader(cfg, id, pool, "tlp")

View File

@@ -1,6 +1,7 @@
from datetime import datetime
def normalizza_data(data_string: str)->str:
def normalizza_data(data_string: str) -> str:
"""
Normalizza una stringa di data al formato YYYY-MM-DD, provando diversi formati di input.
@@ -12,7 +13,12 @@ def normalizza_data(data_string: str)->str:
o None se la stringa non può essere interpretata come una data.
"""
formato_desiderato = "%Y-%m-%d"
formati_input = ["%Y/%m/%d", "%Y-%m-%d", "%d-%m-%Y","%d/%m/%Y", ] # Ordine importante: prova prima il più probabile
formati_input = [
"%Y/%m/%d",
"%Y-%m-%d",
"%d-%m-%Y",
"%d/%m/%Y",
] # Ordine importante: prova prima il più probabile
for formato_input in formati_input:
try:
@@ -23,6 +29,7 @@ def normalizza_data(data_string: str)->str:
return None # Se nessun formato ha avuto successo
def normalizza_orario(orario_str):
try:
# Prova prima con HH:MM:SS
@@ -34,4 +41,4 @@ def normalizza_orario(orario_str):
dt = datetime.strptime(orario_str, "%H:%M")
return dt.strftime("%H:%M:%S")
except ValueError:
return orario_str # Restituisce originale se non parsabile
return orario_str # Restituisce originale se non parsabile