add src path
This commit is contained in:
97
src/elab_orchestrator.py
Executable file
97
src/elab_orchestrator.py
Executable file
@@ -0,0 +1,97 @@
|
||||
#!.venv/bin/python
|
||||
|
||||
# Import necessary libraries
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_matlab_elab as setting
|
||||
from utils.database import WorkflowFlags
|
||||
from utils.database.matlab_query import get_matlab_command
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
from utils.database.loader_action import update_status, unlock
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Delay tra un processamento CSV e il successivo (in secondi)
|
||||
ELAB_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
"""Esegue il ciclo di lavoro per l'elaborazione dei dati caricati.
|
||||
|
||||
Il worker preleva un record dal database che indica dati pronti per
|
||||
l'elaborazione, esegue un comando Matlab associato e attende
|
||||
prima di iniziare un nuovo ciclo.
|
||||
|
||||
Args:
|
||||
worker_id (int): L'ID univoco del worker.
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id:02d}")
|
||||
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.DATA_LOADED, WorkflowFlags.DATA_ELABORATED)
|
||||
if record:
|
||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||
if tool_type.lower() != "gd": # i tool GD non devono essere elaborati
|
||||
tool_elab_info = await get_matlab_command(cfg, tool_name.upper(), unit_name.upper(), pool)
|
||||
if tool_elab_info:
|
||||
if tool_elab_info['statustools'].lower() in cfg.elab_status:
|
||||
logger.info(f"Elaborazione id {id} per {unit_name} {tool_name} ")
|
||||
|
||||
matlab_cmd = f"timeout {cfg.matlab_timeout} ./run_{tool_elab_info['matcall']}.sh {cfg.matlab_runtime} {unit_name.upper()} {tool_name.upper()}"
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
matlab_cmd,
|
||||
cwd=cfg.matlab_func_path,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
logger.error(stderr.decode().strip())
|
||||
with open(f"{cfg.matlab_error_path}{unit_name}{tool_name}_output_error.txt", "w") as f:
|
||||
f.write(stderr.decode().strip())
|
||||
else:
|
||||
logger.info(stdout.decode().strip())
|
||||
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
await asyncio.sleep(ELAB_PROCESSING_DELAY)
|
||||
else:
|
||||
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['statustools']}: MatLab calc by-passed.")
|
||||
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
|
||||
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
else:
|
||||
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
|
||||
await update_status(cfg, id, WorkflowFlags.DUMMY_ELABORATED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Funzione principale che avvia l'elab_orchestrator."""
|
||||
await run_orchestrator(setting.Config, worker)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
148
src/ftp_csv_receiver.py
Executable file
148
src/ftp_csv_receiver.py
Executable file
@@ -0,0 +1,148 @@
|
||||
#!.venv/bin/python
|
||||
"""This module implements an FTP server with custom commands for managing virtual users and handling CSV file uploads."""
|
||||
import os
|
||||
import logging
|
||||
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
|
||||
from utils.config import loader_ftp_csv as setting
|
||||
from utils.database.connection import connetti_db
|
||||
from utils.ftp import user_admin, file_management
|
||||
|
||||
from pyftpdlib.handlers import FTPHandler
|
||||
from pyftpdlib.servers import FTPServer
|
||||
from pyftpdlib.authorizers import DummyAuthorizer, AuthenticationFailed
|
||||
|
||||
# Configure logging (moved inside main function)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DummySha256Authorizer(DummyAuthorizer):
|
||||
"""Custom authorizer that uses SHA256 for password hashing and manages users from a database."""
|
||||
|
||||
def __init__(self: object, cfg: object) -> None:
|
||||
"""Initializes the authorizer, adds the admin user, and loads users from the database.
|
||||
|
||||
Args:
|
||||
cfg: The configuration object.
|
||||
"""
|
||||
super().__init__()
|
||||
self.add_user(
|
||||
cfg.adminuser[0], cfg.adminuser[1], cfg.adminuser[2], perm=cfg.adminuser[3])
|
||||
|
||||
# Define the database connection
|
||||
conn = connetti_db(cfg)
|
||||
|
||||
|
||||
# Create a cursor
|
||||
cur = conn.cursor()
|
||||
cur.execute(f'SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE disabled_at IS NULL')
|
||||
|
||||
for ftpuser, hash, virtpath, perm in cur.fetchall():
|
||||
self.add_user(ftpuser, hash, virtpath, perm)
|
||||
"""
|
||||
Create the user's directory if it does not exist.
|
||||
"""
|
||||
try:
|
||||
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
|
||||
except Exception as e:
|
||||
self.responde(f'551 Error in create virtual user path: {e}')
|
||||
|
||||
def validate_authentication(self: object, username: str, password: str, handler: object) -> None:
|
||||
# Validate the user's password against the stored hash
|
||||
hash = sha256(password.encode("UTF-8")).hexdigest()
|
||||
try:
|
||||
if self.user_table[username]["pwd"] != hash:
|
||||
raise KeyError
|
||||
except KeyError:
|
||||
raise AuthenticationFailed
|
||||
|
||||
class ASEHandler(FTPHandler):
|
||||
"""Custom FTP handler that extends FTPHandler with custom commands and file handling."""
|
||||
|
||||
def __init__(self: object, conn: object, server: object, ioloop:object=None) -> None:
|
||||
"""Initializes the handler, adds custom commands, and sets up command permissions.
|
||||
|
||||
Args:
|
||||
conn (object): The connection object.
|
||||
server (object): The FTP server object.
|
||||
ioloop (object): The I/O loop object.
|
||||
"""
|
||||
super().__init__(conn, server, ioloop)
|
||||
self.proto_cmds = FTPHandler.proto_cmds.copy()
|
||||
# Add custom FTP commands for managing virtual users - command in lowercase
|
||||
self.proto_cmds.update(
|
||||
{'SITE ADDU': dict(perm='M', auth=True, arg=True,
|
||||
help='Syntax: SITE <SP> ADDU USERNAME PASSWORD (add virtual user).')}
|
||||
)
|
||||
self.proto_cmds.update(
|
||||
{'SITE DISU': dict(perm='M', auth=True, arg=True,
|
||||
help='Syntax: SITE <SP> DISU USERNAME (disable virtual user).')}
|
||||
)
|
||||
self.proto_cmds.update(
|
||||
{'SITE ENAU': dict(perm='M', auth=True, arg=True,
|
||||
help='Syntax: SITE <SP> ENAU USERNAME (enable virtual user).')}
|
||||
)
|
||||
self.proto_cmds.update(
|
||||
{'SITE LSTU': dict(perm='M', auth=True, arg=None,
|
||||
help='Syntax: SITE <SP> LSTU (list virtual users).')}
|
||||
)
|
||||
|
||||
def on_file_received(self: object, file: str) -> None:
|
||||
return file_management.on_file_received(self, file)
|
||||
|
||||
def on_incomplete_file_received(self: object, file: str) -> None:
|
||||
"""Removes partially uploaded files.
|
||||
Args:
|
||||
file: The path to the incomplete file.
|
||||
"""
|
||||
os.remove(file)
|
||||
|
||||
def ftp_SITE_ADDU(self: object, line: str) -> None:
|
||||
return user_admin.ftp_SITE_ADDU(self, line)
|
||||
|
||||
def ftp_SITE_DISU(self: object, line: str) -> None:
|
||||
return user_admin.ftp_SITE_DISU(self, line)
|
||||
|
||||
def ftp_SITE_ENAU(self: object, line: str) -> None:
|
||||
return user_admin.ftp_SITE_ENAU(self, line)
|
||||
|
||||
def ftp_SITE_LSTU(self: object, line: str) -> None:
|
||||
return user_admin.ftp_SITE_LSTU(self, line)
|
||||
|
||||
def main():
|
||||
"""Main function to start the FTP server."""
|
||||
# Load the configuration settings
|
||||
cfg = setting.Config()
|
||||
|
||||
try:
|
||||
# Initialize the authorizer and handler
|
||||
authorizer = DummySha256Authorizer(cfg)
|
||||
handler = ASEHandler
|
||||
handler.cfg = cfg
|
||||
handler.authorizer = authorizer
|
||||
handler.masquerade_address = cfg.proxyaddr
|
||||
# Set the range of passive ports for the FTP server
|
||||
_range = list(range(cfg.firstport, cfg.firstport + cfg.portrangewidth))
|
||||
handler.passive_ports = _range
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
|
||||
# Use cfg.logfilename directly without checking its existence
|
||||
filename=cfg.logfilename,
|
||||
level=logging.INFO,
|
||||
)
|
||||
|
||||
# Create and start the FTP server
|
||||
server = FTPServer(("0.0.0.0", cfg.service_port), handler)
|
||||
server.serve_forever()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Exit with error: {e}."
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
130
src/load_ftp_users.py
Normal file
130
src/load_ftp_users.py
Normal file
@@ -0,0 +1,130 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Script per prelevare dati da MySQL e inviare comandi SITE FTP
|
||||
"""
|
||||
|
||||
import mysql.connector
|
||||
from utils.database.connection import connetti_db
|
||||
from utils.config import users_loader as setting
|
||||
from ftplib import FTP
|
||||
import logging
|
||||
import sys
|
||||
from typing import List, Tuple
|
||||
|
||||
# Configurazione logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configurazione server FTP
|
||||
FTP_CONFIG = {
|
||||
'host': 'localhost',
|
||||
'user': 'admin',
|
||||
'password': 'batt1l0',
|
||||
'port': 2121
|
||||
}
|
||||
|
||||
def connect_ftp() -> FTP:
|
||||
"""Connessione al server FTP"""
|
||||
try:
|
||||
ftp = FTP()
|
||||
ftp.connect(FTP_CONFIG['host'], FTP_CONFIG['port'])
|
||||
ftp.login(FTP_CONFIG['user'], FTP_CONFIG['password'])
|
||||
logger.info("Connessione FTP stabilita")
|
||||
return ftp
|
||||
except Exception as e:
|
||||
logger.error(f"Errore connessione FTP: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def fetch_data_from_db(connection: mysql.connector.MySQLConnection) -> List[Tuple]:
|
||||
"""Preleva i dati dal database"""
|
||||
try:
|
||||
cursor = connection.cursor()
|
||||
|
||||
# Modifica questa query secondo le tue esigenze
|
||||
query = """
|
||||
SELECT username, password
|
||||
FROM ase_lar.ftp_accounts
|
||||
"""
|
||||
|
||||
cursor.execute(query)
|
||||
results = cursor.fetchall()
|
||||
|
||||
logger.info(f"Prelevate {len(results)} righe dal database")
|
||||
return results
|
||||
|
||||
except mysql.connector.Error as e:
|
||||
logger.error(f"Errore query database: {e}")
|
||||
return []
|
||||
finally:
|
||||
cursor.close()
|
||||
|
||||
def send_site_command(ftp: FTP, command: str) -> bool:
|
||||
"""Invia un comando SITE al server FTP"""
|
||||
try:
|
||||
# Il comando SITE viene inviato usando sendcmd
|
||||
response = ftp.sendcmd(f"SITE {command}")
|
||||
logger.info(f"Comando SITE '{command}' inviato. Risposta: {response}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Errore invio comando SITE '{command}': {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""Funzione principale"""
|
||||
logger.info("Avvio script caricamento utenti FTP")
|
||||
cfg = setting.Config()
|
||||
|
||||
# Connessioni
|
||||
db_connection = connetti_db(cfg)
|
||||
ftp_connection = connect_ftp()
|
||||
|
||||
try:
|
||||
# Preleva dati dal database
|
||||
data = fetch_data_from_db(db_connection)
|
||||
|
||||
if not data:
|
||||
logger.warning("Nessun dato trovato nel database")
|
||||
return
|
||||
|
||||
success_count = 0
|
||||
error_count = 0
|
||||
|
||||
# Processa ogni riga
|
||||
for row in data:
|
||||
username, password = row
|
||||
|
||||
# Costruisci il comando SITE completo
|
||||
ftp_site_command = f'addu {username} {password}'
|
||||
|
||||
logger.info(f"Sending ftp command: {ftp_site_command}")
|
||||
|
||||
# Invia comando SITE
|
||||
if send_site_command(ftp_connection, ftp_site_command):
|
||||
success_count += 1
|
||||
else:
|
||||
error_count += 1
|
||||
|
||||
logger.info(f"Elaborazione completata. Successi: {success_count}, Errori: {error_count}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore generale: {e}")
|
||||
|
||||
finally:
|
||||
# Chiudi connessioni
|
||||
try:
|
||||
ftp_connection.quit()
|
||||
logger.info("Connessione FTP chiusa")
|
||||
except Exception as e:
|
||||
logger.error(f"Errore chiusura connessione FTP: {e}")
|
||||
|
||||
try:
|
||||
db_connection.close()
|
||||
logger.info("Connessione MySQL chiusa")
|
||||
except Exception as e:
|
||||
logger.error(f"Errore chiusura connessione MySQL: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
120
src/load_orchestrator.py
Executable file
120
src/load_orchestrator.py
Executable file
@@ -0,0 +1,120 @@
|
||||
#!.venv/bin/python
|
||||
|
||||
# Import necessary libraries
|
||||
import logging
|
||||
import importlib
|
||||
import asyncio
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_load_data as setting
|
||||
from utils.database import WorkflowFlags
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Delay tra un processamento CSV e il successivo (in secondi)
|
||||
CSV_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
"""Esegue il ciclo di lavoro per l'elaborazione dei file CSV.
|
||||
|
||||
Il worker preleva un record CSV dal database, ne elabora il contenuto
|
||||
e attende prima di iniziare un nuovo ciclo.
|
||||
|
||||
Args:
|
||||
worker_id (int): L'ID univoco del worker.
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id:02d}")
|
||||
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.DATA_LOADED)
|
||||
|
||||
if record:
|
||||
success = await load_csv(record, cfg, pool)
|
||||
if not success:
|
||||
logger.error("Errore durante l'elaborazione")
|
||||
await asyncio.sleep(CSV_PROCESSING_DELAY)
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=1)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def load_csv(record: tuple, cfg: object, pool: object) -> bool:
|
||||
"""Carica ed elabora un record CSV utilizzando il modulo di parsing appropriato.
|
||||
|
||||
Args:
|
||||
record: Una tupla contenente i dettagli del record CSV da elaborare (id, unit_type, tool_type, unit_name, tool_name).
|
||||
cfg: L'oggetto di configurazione contenente i parametri del sistema.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
True se l'elaborazione del CSV è avvenuta con successo, False altrimenti.
|
||||
"""
|
||||
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.debug("Inizio ricerca nuovo CSV da elaborare")
|
||||
|
||||
id, unit_type, tool_type, unit_name, tool_name = [
|
||||
x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record
|
||||
]
|
||||
logger.info(
|
||||
f"Trovato CSV da elaborare: ID={id}, Tipo={unit_type}_{tool_type}, Nome={unit_name}_{tool_name}"
|
||||
)
|
||||
|
||||
# Costruisce il nome del modulo da caricare dinamicamente
|
||||
module_names = [
|
||||
f"utils.parsers.by_name.{unit_name}_{tool_name}",
|
||||
f"utils.parsers.by_name.{unit_name}_{tool_type}",
|
||||
f"utils.parsers.by_name.{unit_name}_all",
|
||||
f"utils.parsers.by_type.{unit_type}_{tool_type}",
|
||||
]
|
||||
modulo = None
|
||||
for module_name in module_names:
|
||||
try:
|
||||
logger.debug(f"Caricamento dinamico del modulo: {module_name}")
|
||||
modulo = importlib.import_module(module_name)
|
||||
logger.info(f"Funzione 'main_loader' caricata dal modulo {module_name}")
|
||||
break
|
||||
except (ImportError, AttributeError) as e:
|
||||
logger.debug(
|
||||
f"Modulo {module_name} non presente o non valido. {e}",
|
||||
exc_info=debug_mode,
|
||||
)
|
||||
|
||||
if not modulo:
|
||||
logger.error(f"Nessun modulo trovato {module_names}")
|
||||
return False
|
||||
|
||||
# Ottiene la funzione 'main_loader' dal modulo
|
||||
funzione = getattr(modulo, "main_loader")
|
||||
|
||||
# Esegui la funzione
|
||||
logger.info(f"Elaborazione con modulo {modulo} per ID={id}")
|
||||
await funzione(cfg, id, pool)
|
||||
logger.info(f"Elaborazione completata per ID={id}")
|
||||
return True
|
||||
|
||||
|
||||
async def main():
|
||||
"""Funzione principale che avvia il load_orchestrator."""
|
||||
await run_orchestrator(setting.Config, worker)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
68
src/send_orchestrator.py
Executable file
68
src/send_orchestrator.py
Executable file
@@ -0,0 +1,68 @@
|
||||
#!.venv/bin/python
|
||||
|
||||
# Import necessary libraries
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
# Import custom modules for configuration and database connection
|
||||
from utils.config import loader_send_data as setting
|
||||
from utils.database import WorkflowFlags
|
||||
from utils.csv.loaders import get_next_csv_atomic
|
||||
from utils.orchestrator_utils import run_orchestrator, worker_context
|
||||
from utils.database.loader_action import update_status, unlock
|
||||
from utils.database.elab_query import get_data_as_csv
|
||||
#from utils.ftp.elab_send import send_csv_to_customer
|
||||
|
||||
|
||||
# Initialize the logger for this module
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Delay tra un processamento CSV e il successivo (in secondi)
|
||||
ELAB_PROCESSING_DELAY = 0.2
|
||||
# Tempo di attesa se non ci sono record da elaborare
|
||||
NO_RECORD_SLEEP = 60
|
||||
|
||||
async def worker(worker_id: int, cfg: object, pool: object) -> None:
|
||||
|
||||
# Imposta il context per questo worker
|
||||
worker_context.set(f"W{worker_id:02d}")
|
||||
|
||||
debug_mode = logging.getLogger().getEffectiveLevel() == logging.DEBUG
|
||||
logger.info("Avviato")
|
||||
|
||||
while True:
|
||||
try:
|
||||
logger.info("Inizio elaborazione")
|
||||
|
||||
record = await get_next_csv_atomic(pool, cfg.dbrectable, WorkflowFlags.CSV_RECEIVED, WorkflowFlags.SENT_RAW_DATA)
|
||||
|
||||
if record:
|
||||
id, unit_type, tool_type, unit_name, tool_name = [x.lower().replace(" ", "_") if isinstance(x, str) else x for x in record]
|
||||
|
||||
'''
|
||||
if tool_elab_info['ftp_send']:
|
||||
if not tool_elab_info["duedate"] or tool_elab_info["duedate"] in ('0000-00-00 00:00:00', '') or tool_elab_info["duedate"] > timestamp_matlab_elab:
|
||||
if elab_csv := await get_data_as_csv(cfg, id, unit_name, tool_name, timestamp_matlab_elab, pool):
|
||||
print(elab_csv)
|
||||
#if await send_csv_to_customer(cfg, id, unit_name, tool_name, elab_csv, pool):
|
||||
#await update_status(cfg, id, , pool)
|
||||
#await update_status(cfg, id, , pool)
|
||||
else:
|
||||
logger.info(f"id {id} - {unit_name} - {tool_name} {tool_elab_info['duedate']}: ftp put didn't executed because due date reached.")
|
||||
'''
|
||||
|
||||
else:
|
||||
logger.info("Nessun record disponibile")
|
||||
await asyncio.sleep(NO_RECORD_SLEEP)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore durante l'esecuzione: {e}", exc_info=debug_mode)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
async def main():
|
||||
"""Funzione principale che avvia il send_orchestrator."""
|
||||
await run_orchestrator(setting.Config, worker)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
1
src/utils/__init__.py
Normal file
1
src/utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Utilità"""
|
||||
1
src/utils/config/__init__.py
Normal file
1
src/utils/config/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Config ini setting"""
|
||||
56
src/utils/config/loader_ftp_csv.py
Normal file
56
src/utils/config/loader_ftp_csv.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""set configurations
|
||||
|
||||
"""
|
||||
from configparser import ConfigParser
|
||||
|
||||
class Config:
|
||||
def __init__(self):
|
||||
|
||||
c = ConfigParser()
|
||||
c.read(["../env/ftp.ini", "../env/db.ini"])
|
||||
|
||||
# FTP setting
|
||||
self.service_port = c.getint("ftpserver", "service_port")
|
||||
self.firstport = c.getint("ftpserver", "firstPort")
|
||||
self.proxyaddr = c.get("ftpserver", "proxyAddr")
|
||||
self.portrangewidth = c.getint("ftpserver", "portRangeWidth")
|
||||
self.virtpath = c.get("ftpserver", "virtpath")
|
||||
self.adminuser = c.get("ftpserver", "adminuser").split("|")
|
||||
self.servertype = c.get("ftpserver", "servertype")
|
||||
self.certfile = c.get("ftpserver", "certfile")
|
||||
self.fileext = c.get("ftpserver", "fileext").upper().split("|")
|
||||
self.defperm = c.get("ftpserver", "defaultUserPerm")
|
||||
|
||||
# CSV FILE setting
|
||||
self.csvfs = c.get("csvfs", "path")
|
||||
|
||||
# LOG setting
|
||||
self.logfilename = c.get("logging", "logFilename")
|
||||
|
||||
# DB setting
|
||||
self.dbhost = c.get("db", "hostname")
|
||||
self.dbport = c.getint("db", "port")
|
||||
self.dbuser = c.get("db", "user")
|
||||
self.dbpass = c.get("db", "password")
|
||||
self.dbname = c.get("db", "dbName")
|
||||
self.max_retries = c.getint("db", "maxRetries")
|
||||
|
||||
|
||||
# Tables
|
||||
self.dbusertable = c.get("tables", "userTableName")
|
||||
self.dbrectable = c.get("tables", "recTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbnodes = c.get("tables", "nodesTableName")
|
||||
|
||||
# unit setting
|
||||
self.units_name = [part for part in c.get("unit", "Names").split('|')]
|
||||
self.units_type = [part for part in c.get("unit", "Types").split('|')]
|
||||
#self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
|
||||
|
||||
# tool setting
|
||||
self.tools_name = [part for part in c.get("tool", "Names").split('|')]
|
||||
self.tools_type = [part for part in c.get("tool", "Types").split('|')]
|
||||
|
||||
# csv info
|
||||
self.csv_infos = [part for part in c.get("csv", "Infos").split('|')]
|
||||
32
src/utils/config/loader_load_data.py
Normal file
32
src/utils/config/loader_load_data.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""set configurations
|
||||
|
||||
"""
|
||||
from configparser import ConfigParser
|
||||
|
||||
class Config:
|
||||
def __init__(self):
|
||||
|
||||
c = ConfigParser()
|
||||
c.read(["../env/load.ini", "../env/db.ini"])
|
||||
|
||||
# LOG setting
|
||||
self.logfilename = c.get("logging", "logFilename")
|
||||
|
||||
# Worker setting
|
||||
self.max_threads = c.getint("threads", "max_num")
|
||||
|
||||
# DB setting
|
||||
self.dbhost = c.get("db", "hostname")
|
||||
self.dbport = c.getint("db", "port")
|
||||
self.dbuser = c.get("db", "user")
|
||||
self.dbpass = c.get("db", "password")
|
||||
self.dbname = c.get("db", "dbName")
|
||||
self.max_retries = c.getint("db", "maxRetries")
|
||||
|
||||
# Tables
|
||||
self.dbusertable = c.get("tables", "userTableName")
|
||||
self.dbrectable = c.get("tables", "recTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbnodes = c.get("tables", "nodesTableName")
|
||||
|
||||
41
src/utils/config/loader_matlab_elab.py
Normal file
41
src/utils/config/loader_matlab_elab.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""set configurations
|
||||
|
||||
"""
|
||||
from configparser import ConfigParser
|
||||
|
||||
class Config:
|
||||
def __init__(self):
|
||||
|
||||
c = ConfigParser()
|
||||
c.read(["../env/elab.ini", "../env/db.ini"])
|
||||
|
||||
# LOG setting
|
||||
self.logfilename = c.get("logging", "logFilename")
|
||||
|
||||
# Worker setting
|
||||
self.max_threads = c.getint("threads", "max_num")
|
||||
|
||||
# DB setting
|
||||
self.dbhost = c.get("db", "hostname")
|
||||
self.dbport = c.getint("db", "port")
|
||||
self.dbuser = c.get("db", "user")
|
||||
self.dbpass = c.get("db", "password")
|
||||
self.dbname = c.get("db", "dbName")
|
||||
self.max_retries = c.getint("db", "maxRetries")
|
||||
|
||||
# Tables
|
||||
self.dbusertable = c.get("tables", "userTableName")
|
||||
self.dbrectable = c.get("tables", "recTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbnodes = c.get("tables", "nodesTableName")
|
||||
|
||||
# Tool
|
||||
self.elab_status = [part for part in c.get("tool", "elab_status").split('|')]
|
||||
|
||||
# Matlab
|
||||
self.matlab_runtime = c.get("matlab", "runtime")
|
||||
self.matlab_func_path = c.get("matlab", "func_path")
|
||||
self.matlab_timeout = c.getint("matlab", "timeout")
|
||||
self.matlab_error = c.get("matlab", "error")
|
||||
self.matlab_error_path = c.get("matlab", "error_path")
|
||||
31
src/utils/config/loader_send_data.py
Normal file
31
src/utils/config/loader_send_data.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""set configurations
|
||||
|
||||
"""
|
||||
from configparser import ConfigParser
|
||||
|
||||
class Config:
|
||||
def __init__(self):
|
||||
|
||||
c = ConfigParser()
|
||||
c.read(["../env/send.ini", "../env/db.ini"])
|
||||
|
||||
# LOG setting
|
||||
self.logfilename = c.get("logging", "logFilename")
|
||||
|
||||
# Worker setting
|
||||
self.max_threads = c.getint("threads", "max_num")
|
||||
|
||||
# DB setting
|
||||
self.dbhost = c.get("db", "hostname")
|
||||
self.dbport = c.getint("db", "port")
|
||||
self.dbuser = c.get("db", "user")
|
||||
self.dbpass = c.get("db", "password")
|
||||
self.dbname = c.get("db", "dbName")
|
||||
self.max_retries = c.getint("db", "maxRetries")
|
||||
|
||||
# Tables
|
||||
self.dbusertable = c.get("tables", "userTableName")
|
||||
self.dbrectable = c.get("tables", "recTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbrawdata = c.get("tables", "rawTableName")
|
||||
self.dbnodes = c.get("tables", "nodesTableName")
|
||||
20
src/utils/config/users_loader.py
Normal file
20
src/utils/config/users_loader.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""set configurations
|
||||
|
||||
"""
|
||||
from configparser import ConfigParser
|
||||
|
||||
class Config:
|
||||
def __init__(self):
|
||||
|
||||
c = ConfigParser()
|
||||
c.read(["../env/db.ini"])
|
||||
|
||||
# DB setting
|
||||
self.dbhost = c.get("db", "hostname")
|
||||
self.dbport = c.getint("db", "port")
|
||||
self.dbuser = c.get("db", "user")
|
||||
self.dbpass = c.get("db", "password")
|
||||
self.dbname = c.get("db", "dbName")
|
||||
self.max_retries = c.getint("db", "maxRetries")
|
||||
|
||||
|
||||
1
src/utils/csv/__init__.py
Normal file
1
src/utils/csv/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Parser delle centraline"""
|
||||
236
src/utils/csv/data_preparation.py
Normal file
236
src/utils/csv/data_preparation.py
Normal file
@@ -0,0 +1,236 @@
|
||||
#!.venv/bin/python
|
||||
from utils.database.nodes_query import get_nodes_type
|
||||
from utils.timestamp.date_check import normalizza_data, normalizza_orario
|
||||
from utils.database.loader_action import find_nearest_timestamp
|
||||
import logging
|
||||
import re
|
||||
|
||||
from itertools import islice
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_data(cfg: object, id: int, pool: object) -> tuple:
|
||||
"""
|
||||
Retrieves unit name, tool name, and tool data for a given record ID from the database.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object containing database table name.
|
||||
id (int): The ID of the record to retrieve.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
tuple: A tuple containing unit_name, tool_name, and tool_data.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
|
||||
unit_name, tool_name, tool_data = await cur.fetchone()
|
||||
|
||||
return unit_name, tool_name, tool_data
|
||||
|
||||
async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes pipe-separated data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
"""
|
||||
Ciclo su tutte le righe del file CSV, escludendo quelle che:
|
||||
non hanno il pattern ';|;' perché non sono dati ma è la header
|
||||
che hanno il pattern 'No RX' perché sono letture non pervenute o in errore
|
||||
che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola
|
||||
che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina
|
||||
"""
|
||||
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
|
||||
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
if batlevel == '|':
|
||||
batlevel = temperature
|
||||
temperature, rilevazioni = rilevazioni.split(';',1)
|
||||
''' in alcune letture mancano temperatura e livello batteria'''
|
||||
if temperature == '':
|
||||
temperature = 0
|
||||
if batlevel == '':
|
||||
batlevel = 0
|
||||
valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
|
||||
for num_nodo, valori_nodo in enumerate(valori_nodi, start=1):
|
||||
valori = valori_nodo.split(';')
|
||||
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes analog and digital input data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$'
|
||||
if node_ains or node_dins:
|
||||
for riga in [riga for riga in righe if re.match(pattern, riga)]:
|
||||
timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';')
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
if any(node_ains):
|
||||
for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1):
|
||||
matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1)))
|
||||
else:
|
||||
logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}")
|
||||
if any(node_dins):
|
||||
start_node = 3 if any(node_ains) else 1
|
||||
for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node):
|
||||
matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1)))
|
||||
else:
|
||||
logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}")
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes channel-based data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
|
||||
timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3)
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
|
||||
valori_iter = iter(valori_splitted)
|
||||
|
||||
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
|
||||
|
||||
for num_nodo, valori in enumerate(valori_nodi, start=1):
|
||||
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
|
||||
|
||||
return matrice_valori
|
||||
|
||||
async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'Musa' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
|
||||
timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2)
|
||||
if timestamp == '':
|
||||
continue
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
temperature = rilevazioni.split(';')[0]
|
||||
logger.info(f'{temperature}, {rilevazioni}')
|
||||
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
|
||||
valori_iter = iter(valori_splitted)
|
||||
|
||||
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
|
||||
|
||||
for num_nodo, valori in enumerate(valori_nodi, start=1):
|
||||
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
|
||||
|
||||
return matrice_valori
|
||||
|
||||
|
||||
async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'TLP' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
valori_x_nodo = 2
|
||||
matrice_valori = []
|
||||
for riga in righe:
|
||||
timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(';',4)
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
lista_rilevazioni = rilevazioni.strip(';').split(';')
|
||||
lista_rilevazioni.append(barometer)
|
||||
valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)]
|
||||
for num_nodo, valori in enumerate(valori_nodi, start=1):
|
||||
matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
|
||||
return matrice_valori
|
||||
|
||||
|
||||
|
||||
async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
|
||||
"""
|
||||
Processes 'GD' specific data from a CSV record into a structured matrix.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record.
|
||||
pool (object): The database connection pool.
|
||||
Returns:
|
||||
list: A list of lists, where each inner list represents a row in the matrix.
|
||||
"""
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
righe = ToolData.splitlines()
|
||||
matrice_valori = []
|
||||
pattern = r';-?\d+dB$'
|
||||
for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]:
|
||||
timestamp, rilevazioni = riga.split(';|;',1)
|
||||
EventDate, EventTime = timestamp.split(' ')
|
||||
#logger.debug(f"GD id {id}: {pattern} {rilevazioni}")
|
||||
if re.search(pattern, rilevazioni):
|
||||
if len(matrice_valori) == 0:
|
||||
matrice_valori.append(['RSSI'])
|
||||
batlevel, temperature, rssi = rilevazioni.split(';')
|
||||
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}")
|
||||
|
||||
gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S")
|
||||
start_timestamp = gd_timestamp - timedelta(seconds=45)
|
||||
end_timestamp = gd_timestamp + timedelta(seconds=45)
|
||||
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])])
|
||||
|
||||
elif all(char == ';' for char in rilevazioni):
|
||||
pass
|
||||
elif ';|;' in rilevazioni:
|
||||
unit_metrics, data = rilevazioni.split(';|;')
|
||||
batlevel, temperature = unit_metrics.split(';')
|
||||
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}")
|
||||
|
||||
dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp(cfg, {"timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "unit": UnitName, "tool": ToolNameID.replace("GD", "DT"), "node_num": 1}, pool)
|
||||
EventDate, EventTime = dt_timestamp.strftime('%Y-%m-%d %H:%M:%S').split(' ')
|
||||
valori = data.split(';')
|
||||
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)] + valori + ([None] * (16 - len(valori))) + [batlevel, temperature, None])
|
||||
else:
|
||||
logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}")
|
||||
|
||||
return matrice_valori
|
||||
77
src/utils/csv/loaders.py
Normal file
77
src/utils/csv/loaders.py
Normal file
@@ -0,0 +1,77 @@
|
||||
from utils.database.loader_action import load_data, update_status, unlock
|
||||
from utils.database import WorkflowFlags
|
||||
from utils.csv.data_preparation import make_pipe_sep_matrix, make_ain_din_matrix, make_channels_matrix, make_tlp_matrix, make_gd_matrix, make_musa_matrix
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
|
||||
"""
|
||||
Main loader function to process CSV data based on the specified action.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object.
|
||||
id (int): The ID of the CSV record to process.
|
||||
pool (object): The database connection pool.
|
||||
action (str): The type of data processing to perform (e.g., "pipe_separator", "analogic_digital").
|
||||
"""
|
||||
type_matrix_mapping = {
|
||||
"pipe_separator": make_pipe_sep_matrix,
|
||||
"analogic_digital": make_ain_din_matrix,
|
||||
"channels": make_channels_matrix,
|
||||
"tlp": make_tlp_matrix,
|
||||
"gd": make_gd_matrix,
|
||||
"musa": make_musa_matrix
|
||||
}
|
||||
if action in type_matrix_mapping:
|
||||
function_to_call = type_matrix_mapping[action]
|
||||
# Create a matrix of values from the data
|
||||
matrice_valori = await function_to_call(cfg, id, pool)
|
||||
|
||||
logger.info("matrice valori creata")
|
||||
# Load the data into the database
|
||||
if await load_data(cfg, matrice_valori, pool, type=action):
|
||||
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
else:
|
||||
logger.warning(f"Action '{action}' non riconosciuta.")
|
||||
|
||||
|
||||
async def get_next_csv_atomic(pool, table_name, status, next_status):
|
||||
"""Preleva atomicamente il prossimo CSV da elaborare"""
|
||||
async with pool.acquire() as conn:
|
||||
# IMPORTANTE: Disabilita autocommit per questa transazione
|
||||
await conn.begin()
|
||||
|
||||
try:
|
||||
async with conn.cursor() as cur:
|
||||
# Usa SELECT FOR UPDATE per lock atomico
|
||||
|
||||
await cur.execute(f"""
|
||||
SELECT id, unit_type, tool_type, unit_name, tool_name
|
||||
FROM {table_name}
|
||||
WHERE locked = 0
|
||||
AND ((status & %s) > 0 OR %s = 0)
|
||||
AND (status & %s) = 0
|
||||
ORDER BY id
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""", (status, status, next_status))
|
||||
|
||||
result = await cur.fetchone()
|
||||
if result:
|
||||
await cur.execute(f"""
|
||||
UPDATE {table_name}
|
||||
SET locked = 1
|
||||
WHERE id = %s
|
||||
""", (result[0],))
|
||||
|
||||
# Commit esplicito per rilasciare il lock
|
||||
await conn.commit()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Rollback in caso di errore
|
||||
await conn.rollback()
|
||||
raise e
|
||||
10
src/utils/csv/parser.py
Normal file
10
src/utils/csv/parser.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import re
|
||||
|
||||
def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str:
|
||||
|
||||
for source in (primary_source, secondary_source):
|
||||
for pattern in patterns:
|
||||
matches = re.findall(pattern, source, re.IGNORECASE)
|
||||
if matches:
|
||||
return matches[0] # Return the first match immediately
|
||||
return default # Return default if no matches are found
|
||||
21
src/utils/database/__init__.py
Normal file
21
src/utils/database/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
class WorkflowFlags:
|
||||
CSV_RECEIVED = 0 # 0000
|
||||
DATA_LOADED = 1 # 0001
|
||||
DATA_ELABORATED = 2 # 0010
|
||||
SENT_RAW_DATA = 4 # 0100
|
||||
SENT_ELAB_DATA = 8 # 1000
|
||||
DUMMY_ELABORATED = 16 # 10000
|
||||
|
||||
|
||||
# Mappatura flag -> colonna timestamp
|
||||
FLAG_TO_TIMESTAMP = {
|
||||
WorkflowFlags.CSV_RECEIVED: "inserted_at",
|
||||
WorkflowFlags.DATA_LOADED: "loaded_at",
|
||||
WorkflowFlags.DATA_ELABORATED: "elaborated_at",
|
||||
WorkflowFlags.SENT_RAW_DATA: "sent_raw_at",
|
||||
WorkflowFlags.SENT_ELAB_DATA: "sent_elab_at",
|
||||
WorkflowFlags.DUMMY_ELABORATED: "elaborated_at"
|
||||
}
|
||||
|
||||
# Dimensione degli split della matrice per il caricamento
|
||||
BATCH_SIZE = 1000
|
||||
34
src/utils/database/connection.py
Normal file
34
src/utils/database/connection.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import logging
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def connetti_db(cfg: object) -> object:
|
||||
"""
|
||||
Establishes a connection to a MySQL database.
|
||||
|
||||
Args:
|
||||
cfg: A configuration object containing database connection parameters.
|
||||
It should have the following attributes:
|
||||
- dbuser: The database username.
|
||||
- dbpass: The database password.
|
||||
- dbhost: The database host address.
|
||||
- dbport: The database port number.
|
||||
- dbname: The name of the database to connect to.
|
||||
|
||||
Returns:
|
||||
A MySQL connection object if the connection is successful, otherwise None.
|
||||
"""
|
||||
try:
|
||||
conn = mysql.connector.connect(user=cfg.dbuser,
|
||||
password=cfg.dbpass,
|
||||
host=cfg.dbhost,
|
||||
port=cfg.dbport,
|
||||
database=cfg.dbname)
|
||||
conn.autocommit = True
|
||||
logger.info("Connected")
|
||||
return conn
|
||||
except Error as e:
|
||||
logger.error(f"Database connection error: {e}")
|
||||
raise # Re-raise the exception to be handled by the caller
|
||||
60
src/utils/database/elab_query.py
Normal file
60
src/utils/database/elab_query.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import csv
|
||||
from io import StringIO
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_data_as_csv(cfg: dict, id_recv: int, unit: str, tool: str, matlab_timestamp: float, pool: object) -> str:
|
||||
"""
|
||||
Retrieves elaborated data from the database and formats it as a CSV string.
|
||||
|
||||
The query selects data from the `ElabDataView` based on `UnitName`, `ToolNameID`,
|
||||
and a `updated_at` timestamp, then orders it. The first row of the CSV will be
|
||||
the column headers.
|
||||
|
||||
Args:
|
||||
cfg (dict): Configuration dictionary (not directly used in the query but passed for consistency).
|
||||
id_recv (int): The ID of the record being processed (used for logging).
|
||||
pool (object): The database connection pool.
|
||||
unit (str): The name of the unit to filter the data.
|
||||
tool (str): The ID of the tool to filter the data.
|
||||
matlab_timestamp (float): A timestamp used to filter data updated after this time.
|
||||
|
||||
Returns:
|
||||
str: A string containing the elaborated data in CSV format.
|
||||
"""
|
||||
query = """
|
||||
select * from (
|
||||
select 'ToolNameID', 'EventDate', 'EventTime', 'NodeNum', 'NodeType', 'NodeDepth',
|
||||
'XShift', 'YShift', 'ZShift' , 'X', 'Y', 'Z', 'HShift', 'HShiftDir', 'HShift_local',
|
||||
'speed', 'speed_local', 'acceleration', 'acceleration_local', 'T_node', 'water_level', 'pressure', 'load_value', 'AlfaX', 'AlfaY', 'CalcErr'
|
||||
union all
|
||||
select ToolNameID, EventDate, EventTime, NodeNum, NodeType, NodeDepth,
|
||||
XShift, YShift, ZShift , X, Y, Z, HShift, HShiftDir, HShift_local,
|
||||
speed, speed_local, acceleration, acceleration_local, T_node, water_level, pressure, load_value, AlfaX, AlfaY, calcerr
|
||||
from ElabDataView
|
||||
where UnitName = %s and ToolNameID = %s and updated_at > %s
|
||||
order by ToolNameID DESC, concat(EventDate, EventTime), convert(`NodeNum`, UNSIGNED INTEGER) DESC
|
||||
) resulting_set
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(query, (unit, tool, matlab_timestamp))
|
||||
results = await cur.fetchall()
|
||||
logger.info(f"id {id_recv} - {unit} - {tool}: estratti i dati per invio CSV")
|
||||
logger.info(f"Numero di righe estratte: {len(results)}")
|
||||
|
||||
# Creare CSV in memoria
|
||||
output = StringIO()
|
||||
writer = csv.writer(output, delimiter=",", lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
|
||||
for row in results:
|
||||
writer.writerow(row)
|
||||
csv_data = output.getvalue()
|
||||
output.close()
|
||||
|
||||
return csv_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"id {id_recv} - {unit} - {tool} - errore nel query creazione csv: {e}")
|
||||
return None
|
||||
232
src/utils/database/loader_action.py
Normal file
232
src/utils/database/loader_action.py
Normal file
@@ -0,0 +1,232 @@
|
||||
#!.venv/bin/python
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) -> bool:
|
||||
"""Carica una lista di record di dati grezzi nel database.
|
||||
|
||||
Esegue un'operazione di inserimento massivo (executemany) per caricare i dati.
|
||||
Utilizza la clausola 'ON DUPLICATE KEY UPDATE' per aggiornare i record esistenti.
|
||||
Implementa una logica di re-tentativo in caso di deadlock.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo.
|
||||
matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
type (str): tipo di caricamento dati. Per GD fa l'update del tool DT corrispondente
|
||||
|
||||
Returns:
|
||||
bool: True se il caricamento ha avuto successo, False altrimenti.
|
||||
"""
|
||||
if not matrice_valori:
|
||||
logger.info("Nulla da caricare.")
|
||||
return True
|
||||
|
||||
if type == "gd" and matrice_valori[0][0] == "RSSI":
|
||||
matrice_valori.pop(0)
|
||||
sql_load_RAWDATA = f"""
|
||||
UPDATE {cfg.dbrawdata} t1
|
||||
JOIN (
|
||||
SELECT id
|
||||
FROM {cfg.dbrawdata}
|
||||
WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s
|
||||
AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN %s AND %s
|
||||
ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), %s))
|
||||
LIMIT 1
|
||||
) t2 ON t1.id = t2.id
|
||||
SET t1.BatLevelModule = %s, t1.TemperatureModule = %s, t1.RssiModule = %s
|
||||
"""
|
||||
else:
|
||||
sql_load_RAWDATA = f"""
|
||||
INSERT INTO {cfg.dbrawdata} (
|
||||
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
|
||||
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
|
||||
`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`,
|
||||
`BatLevelModule`,`TemperatureModule`, `RssiModule`
|
||||
)
|
||||
VALUES (
|
||||
%s, %s, %s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s, %s, %s, %s,
|
||||
%s, %s, %s, %s, %s, %s, %s, %s,
|
||||
%s, %s, %s
|
||||
) as new_data
|
||||
ON DUPLICATE KEY UPDATE
|
||||
`BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`),
|
||||
`Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`),
|
||||
`Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0 AND new_data.`Val0` IS NOT NULL, new_data.Val0, {cfg.dbrawdata}.`Val0`),
|
||||
`Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1 AND new_data.`Val1` IS NOT NULL, new_data.Val1, {cfg.dbrawdata}.`Val1`),
|
||||
`Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2 AND new_data.`Val2` IS NOT NULL, new_data.Val2, {cfg.dbrawdata}.`Val2`),
|
||||
`Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3 AND new_data.`Val3` IS NOT NULL, new_data.Val3, {cfg.dbrawdata}.`Val3`),
|
||||
`Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4 AND new_data.`Val4` IS NOT NULL, new_data.Val4, {cfg.dbrawdata}.`Val4`),
|
||||
`Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5 AND new_data.`Val5` IS NOT NULL, new_data.Val5, {cfg.dbrawdata}.`Val5`),
|
||||
`Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6 AND new_data.`Val6` IS NOT NULL, new_data.Val6, {cfg.dbrawdata}.`Val6`),
|
||||
`Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7 AND new_data.`Val7` IS NOT NULL, new_data.Val7, {cfg.dbrawdata}.`Val7`),
|
||||
`Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8 AND new_data.`Val8` IS NOT NULL, new_data.Val8, {cfg.dbrawdata}.`Val8`),
|
||||
`Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9 AND new_data.`Val9` IS NOT NULL, new_data.Val9, {cfg.dbrawdata}.`Val9`),
|
||||
`ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA AND new_data.`ValA` IS NOT NULL, new_data.ValA, {cfg.dbrawdata}.`ValA`),
|
||||
`ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB AND new_data.`ValB` IS NOT NULL, new_data.ValB, {cfg.dbrawdata}.`ValB`),
|
||||
`ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC AND new_data.`ValC` IS NOT NULL, new_data.ValC, {cfg.dbrawdata}.`ValC`),
|
||||
`ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`),
|
||||
`ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`),
|
||||
`ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`),
|
||||
`BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`),
|
||||
`TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`),
|
||||
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
|
||||
`Created_at` = NOW()
|
||||
"""
|
||||
#logger.info(f"Query insert: {sql_load_RAWDATA}.")
|
||||
#logger.info(f"Matrice valori da inserire: {matrice_valori}.")
|
||||
rc = False
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
for attempt in range(cfg.max_retries):
|
||||
try:
|
||||
logger.info(f"Loading data attempt {attempt + 1}.")
|
||||
|
||||
for i in range(0, len(matrice_valori), BATCH_SIZE):
|
||||
batch = matrice_valori[i:i + BATCH_SIZE]
|
||||
|
||||
await cur.executemany(sql_load_RAWDATA, batch)
|
||||
await conn.commit()
|
||||
|
||||
logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}")
|
||||
|
||||
logger.info("Data loaded.")
|
||||
rc = True
|
||||
break
|
||||
except Exception as e:
|
||||
await conn.rollback()
|
||||
logger.error(f"Error: {e}.")
|
||||
# logger.error(f"Matrice valori da inserire: {batch}.")
|
||||
|
||||
if e.args[0] == 1213: # Deadlock detected
|
||||
logger.warning(
|
||||
f"Deadlock detected, attempt {attempt + 1}/{cfg.max_retries}"
|
||||
)
|
||||
|
||||
if attempt < cfg.max_retries - 1:
|
||||
delay = 2 * attempt
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
logger.error("Max retry attempts reached for deadlock")
|
||||
raise
|
||||
return rc
|
||||
|
||||
|
||||
async def update_status(cfg: object, id: int, status: str, pool: object) -> None:
|
||||
"""Aggiorna lo stato di un record nella tabella dei record CSV.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
|
||||
id (int): L'ID del record da aggiornare.
|
||||
status (int): Il nuovo stato da impostare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(
|
||||
f"""update {cfg.dbrectable} set
|
||||
status = status | {status},
|
||||
{FLAG_TO_TIMESTAMP[status]} = now()
|
||||
where id = {id}
|
||||
"""
|
||||
)
|
||||
await conn.commit()
|
||||
logger.info(f"Status updated id {id}.")
|
||||
except Exception as e:
|
||||
await conn.rollback()
|
||||
logger.error(f"Error: {e}")
|
||||
|
||||
|
||||
async def unlock(cfg: object, id: int, pool: object) -> None:
|
||||
"""Sblocca un record nella tabella dei record CSV.
|
||||
|
||||
Imposta il campo 'locked' a 0 per un dato ID.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione contenente il nome della tabella.
|
||||
id (int): L'ID del record da sbloccare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(
|
||||
f"update {cfg.dbrectable} set locked = 0 where id = {id}"
|
||||
)
|
||||
await conn.commit()
|
||||
logger.info(f"id {id} unlocked.")
|
||||
except Exception as e:
|
||||
await conn.rollback()
|
||||
logger.error(f"Error: {e}")
|
||||
|
||||
|
||||
async def get_matlab_cmd(cfg: object, unit: str, tool: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
unit (str): Il nome dell'unità.
|
||||
tool (str): Il nome dello strumento.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
tuple: Una tupla contenente le informazioni del comando Matlab, o None in caso di errore.
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate
|
||||
from matfuncs as m
|
||||
inner join tools as t on t.matfunc = m.id
|
||||
inner join units as u on u.id = t.unit_id
|
||||
inner join statustools as s on t.statustool_id = s.id
|
||||
where t.name = "{tool}" and u.name = "{unit}"''')
|
||||
return await cur.fetchone()
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
|
||||
async def find_nearest_timestamp(cfg: object, unit_tool_data: dict, pool: object) -> tuple:
|
||||
"""
|
||||
Finds the nearest timestamp in the raw data table based on a reference timestamp
|
||||
and unit/tool/node information.
|
||||
|
||||
Args:
|
||||
cfg (object): Configuration object containing database table name (`cfg.dbrawdata`).
|
||||
unit_tool_data (dict): A dictionary containing:
|
||||
- "timestamp" (str): The reference timestamp string in "%Y-%m-%d %H:%M:%S" format.
|
||||
- "unit" (str): The UnitName to filter by.
|
||||
- "tool" (str): The ToolNameID to filter by.
|
||||
- "node_num" (int): The NodeNum to filter by.
|
||||
pool (object): The database connection pool.
|
||||
|
||||
Returns:
|
||||
tuple: A tuple containing the event timestamp, BatLevel, and Temperature of the
|
||||
nearest record, or None if an error occurs or no record is found.
|
||||
"""
|
||||
|
||||
ref_timestamp = datetime.strptime(unit_tool_data["timestamp"], "%Y-%m-%d %H:%M:%S")
|
||||
start_timestamp = ref_timestamp - timedelta(seconds=45)
|
||||
end_timestamp = ref_timestamp + timedelta(seconds=45)
|
||||
logger.info(f"Find nearest timestamp: {ref_timestamp}")
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor() as cur:
|
||||
try:
|
||||
await cur.execute(f'''SELECT TIMESTAMP(`EventDate`, `EventTime`) AS event_timestamp, BatLevel, Temperature
|
||||
FROM {cfg.dbrawdata}
|
||||
WHERE UnitName = "{unit_tool_data["unit"]}" AND ToolNameID = "{unit_tool_data["tool"]}" AND NodeNum = {unit_tool_data["node_num"]}
|
||||
AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN "{start_timestamp}" AND "{end_timestamp}"
|
||||
ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), "{ref_timestamp}"))
|
||||
LIMIT 1
|
||||
''')
|
||||
return await cur.fetchone()
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
39
src/utils/database/matlab_query.py
Normal file
39
src/utils/database/matlab_query.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import logging
|
||||
import aiomysql
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_matlab_command(cfg: object, tool: str, unit: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni per l'esecuzione di un comando Matlab dal database.
|
||||
|
||||
Interroga il database per ottenere i dettagli necessari all'avvio di uno script
|
||||
Matlab, basandosi sul nome dello strumento (tool) e dell'unità (unit).
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
tool (str): Il nome dello strumento.
|
||||
unit (str): Il nome dell'unità.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
tuple: Una tupla contenente le informazioni del comando Matlab,
|
||||
o None se non viene trovato alcun comando.
|
||||
"""
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||
await cur.execute(f"""
|
||||
SELECT m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate from matfuncs as m
|
||||
INNER JOIN tools as t on t.matfunc = m.id
|
||||
INNER JOIN units as u on u.id = t.unit_id
|
||||
INNER JOIN statustools as s on t.statustool_id = s.id
|
||||
where t.name = '{tool}' AND u.name = '{unit}';
|
||||
""")
|
||||
|
||||
result = await cur.fetchone()
|
||||
|
||||
if not result:
|
||||
logger.error(f"{unit} - {tool}: Matlab command not found.")
|
||||
return None
|
||||
else:
|
||||
return result
|
||||
46
src/utils/database/nodes_query.py
Normal file
46
src/utils/database/nodes_query.py
Normal file
@@ -0,0 +1,46 @@
|
||||
|
||||
import aiomysql
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def get_nodes_type(cfg: object, tool: str, unit: str, pool: object) -> tuple:
|
||||
"""Recupera le informazioni sui nodi (tipo, canali, input) per un dato strumento e unità.
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
tool (str): Il nome dello strumento.
|
||||
unit (str): Il nome dell'unità.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
|
||||
Returns:
|
||||
tuple: Una tupla contenente quattro liste: canali, tipi, ain, din.
|
||||
Se non vengono trovati risultati, restituisce (None, None, None, None).
|
||||
"""
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||
await cur.execute(f"""
|
||||
SELECT t.name AS name, n.seq AS seq, n.num AS num, n.channels AS channels, y.type AS type, n.ain AS ain, n.din AS din
|
||||
FROM {cfg.dbname}.{cfg.dbnodes} AS n
|
||||
INNER JOIN tools AS t ON t.id = n.tool_id
|
||||
INNER JOIN units AS u ON u.id = t.unit_id
|
||||
INNER JOIN nodetypes AS y ON n.nodetype_id = y.id
|
||||
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
|
||||
ORDER BY n.num;
|
||||
""")
|
||||
|
||||
results = await cur.fetchall()
|
||||
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")
|
||||
|
||||
if not results:
|
||||
logger.info(f"{unit} - {tool}: Node/Channels/Ain/Din not defined.")
|
||||
return None, None, None, None
|
||||
else:
|
||||
channels, types, ains, dins = [], [], [], []
|
||||
for row in results:
|
||||
channels.append(row['channels'])
|
||||
types.append(row['type'])
|
||||
ains.append(row['ain'])
|
||||
dins.append(row['din'])
|
||||
return channels, types, ains, dins
|
||||
0
src/utils/ftp/__init__.py
Normal file
0
src/utils/ftp/__init__.py
Normal file
52
src/utils/ftp/file_management.py
Normal file
52
src/utils/ftp/file_management.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import os
|
||||
import logging
|
||||
|
||||
import mysql.connector
|
||||
|
||||
from utils.database.connection import connetti_db
|
||||
|
||||
from utils.csv.parser import extract_value
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def on_file_received(self: object, file: str) -> None:
|
||||
"""Handles the event when a file is successfully received.
|
||||
|
||||
Args:
|
||||
file: The path to the received file.
|
||||
"""
|
||||
if not os.stat(file).st_size:
|
||||
os.remove(file)
|
||||
logger.info(f'File {file} is empty: removed.')
|
||||
else:
|
||||
cfg = self.cfg
|
||||
path, filenameExt = os.path.split(file)
|
||||
filename, fileExtension = os.path.splitext(filenameExt)
|
||||
if (fileExtension.upper() in (cfg.fileext)):
|
||||
with open(file, 'r', encoding='utf-8', errors='ignore') as csvfile:
|
||||
lines = csvfile.readlines()
|
||||
|
||||
unit_name = extract_value(cfg.units_name, filename, str(lines[0:10]))
|
||||
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
|
||||
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
|
||||
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
|
||||
|
||||
try:
|
||||
conn = connetti_db(cfg)
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error: {e}")
|
||||
logger.error(f'{e}')
|
||||
|
||||
# Create a cursor
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines)))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'File {file} not loaded. Held in user path.')
|
||||
logger.error(f'{e}')
|
||||
else:
|
||||
os.remove(file)
|
||||
logger.info(f'File {file} loaded: removed.')
|
||||
142
src/utils/ftp/send_data.py
Normal file
142
src/utils/ftp/send_data.py
Normal file
@@ -0,0 +1,142 @@
|
||||
from ftplib import FTP, FTP_TLS, all_errors
|
||||
from io import BytesIO
|
||||
import logging
|
||||
import aiomysql
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class FTPConnection:
|
||||
"""
|
||||
Manages an FTP or FTP_TLS connection, providing a context manager for automatic disconnection.
|
||||
"""
|
||||
def __init__(self, host, port=21, use_tls=False, user='', passwd='',
|
||||
passive=True, timeout=None, debug=0, context=None):
|
||||
|
||||
self.use_tls = use_tls
|
||||
|
||||
if use_tls:
|
||||
self.ftp = FTP_TLS(context=context, timeout=timeout) if context else FTP_TLS(timeout=timeout)
|
||||
else:
|
||||
self.ftp = FTP(timeout=timeout)
|
||||
|
||||
if debug > 0:
|
||||
self.ftp.set_debuglevel(debug)
|
||||
|
||||
self.ftp.connect(host, port)
|
||||
self.ftp.login(user, passwd)
|
||||
self.ftp.set_pasv(passive)
|
||||
|
||||
if use_tls:
|
||||
self.ftp.prot_p()
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Delega tutti i metodi non definiti all'oggetto FTP sottostante"""
|
||||
return getattr(self.ftp, name)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.ftp.quit()
|
||||
|
||||
async def send_raw_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
||||
None
|
||||
return True
|
||||
|
||||
async def send_elab_csv_to_customer(cfg: dict, id: int, unit: str, tool: str, csv_data: str, pool: object) -> bool:
|
||||
"""
|
||||
Sends elaborated CSV data to a customer via FTP.
|
||||
|
||||
Retrieves FTP connection details from the database based on the unit name,
|
||||
then establishes an FTP connection and uploads the CSV data.
|
||||
|
||||
Args:
|
||||
cfg (dict): Configuration dictionary (not directly used in this function but passed for consistency).
|
||||
id (int): The ID of the record being processed (used for logging).
|
||||
unit (str): The name of the unit associated with the data.
|
||||
tool (str): The name of the tool associated with the data.
|
||||
csv_data (str): The CSV data as a string to be sent.
|
||||
pool (object): The database connection pool.
|
||||
|
||||
Returns:
|
||||
bool: True if the CSV data was sent successfully, False otherwise.
|
||||
"""
|
||||
query = """
|
||||
select ftp_addrs, ftp_user, ftp_passwd, ftp_parm, ftp_filename, ftp_target, duedate from units
|
||||
where name = '%s'";'
|
||||
"""
|
||||
async with pool.acquire() as conn:
|
||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||
try:
|
||||
await cur.execute(query, (unit,))
|
||||
send_ftp_info = await cur.fetchone()
|
||||
logger.info(f"id {id} - {unit} - {tool}: estratti i dati per invio via ftp")
|
||||
except Exception as e:
|
||||
logger.error(f"id {id} - {unit} - {tool} - errore nel query per invio ftp: {e}")
|
||||
|
||||
try:
|
||||
# Converti in bytes
|
||||
csv_bytes = csv_data.encode('utf-8')
|
||||
csv_buffer = BytesIO(csv_bytes)
|
||||
|
||||
ftp_parms = parse_ftp_parms(send_ftp_info["ftp_parm"])
|
||||
use_tls = 'ssl_version' in ftp_parms
|
||||
passive = ftp_parms.get('passive', True)
|
||||
port = ftp_parms.get('port', 21)
|
||||
|
||||
# Connessione FTP
|
||||
with FTPConnection(host=send_ftp_info["ftp_addrs"], port=port, use_tls=use_tls, user=send_ftp_info["ftp_user"], passwd=send_ftp_info["ftp_passwd"], passive=passive) as ftp:
|
||||
|
||||
# Cambia directory
|
||||
if send_ftp_info["ftp_target"] != "/":
|
||||
ftp.cwd(send_ftp_info["ftp_target"])
|
||||
|
||||
# Invia il file
|
||||
result = ftp.storbinary(f'STOR {send_ftp_info["ftp_filename"]}', csv_buffer)
|
||||
|
||||
if result.startswith('226'):
|
||||
logger.info(f"File {send_ftp_info["ftp_filename"]} inviato con successo")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Errore nell'invio: {result}")
|
||||
return False
|
||||
|
||||
except all_errors as e:
|
||||
logger.error(f"Errore FTP: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Errore generico: {e}")
|
||||
return False
|
||||
finally:
|
||||
csv_buffer.close()
|
||||
|
||||
def parse_ftp_parms(ftp_parms: str) -> dict:
|
||||
"""
|
||||
Parses a string of FTP parameters into a dictionary.
|
||||
|
||||
Args:
|
||||
ftp_parms (str): A string containing key-value pairs separated by commas,
|
||||
with keys and values separated by '=>'.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary where keys are parameter names (lowercase) and values are their parsed values.
|
||||
"""
|
||||
# Rimuovere spazi e dividere per virgola
|
||||
pairs = ftp_parms.split(',')
|
||||
result = {}
|
||||
|
||||
for pair in pairs:
|
||||
if '=>' in pair:
|
||||
key, value = pair.split('=>', 1)
|
||||
key = key.strip().lower()
|
||||
value = value.strip().lower()
|
||||
|
||||
# Convertire i valori appropriati
|
||||
if value.isdigit():
|
||||
value = int(value)
|
||||
elif value == '':
|
||||
value = None
|
||||
|
||||
result[key] = value
|
||||
|
||||
return result
|
||||
159
src/utils/ftp/user_admin.py
Normal file
159
src/utils/ftp/user_admin.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import os
|
||||
import mysql.connector
|
||||
import logging
|
||||
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
|
||||
from utils.database.connection import connetti_db
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def ftp_SITE_ADDU(self: object, line: str) -> None:
|
||||
"""
|
||||
Adds a virtual user, creates their directory, and saves their details to the database.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username and password separated by a space.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
try:
|
||||
parms = line.split()
|
||||
user = os.path.basename(parms[0]) # Extract the username
|
||||
password = parms[1] # Get the password
|
||||
hash = sha256(password.encode("UTF-8")).hexdigest() # Hash the password
|
||||
except IndexError:
|
||||
self.respond('501 SITE ADDU failed. Command needs 2 arguments')
|
||||
else:
|
||||
try:
|
||||
# Create the user's directory
|
||||
Path(cfg.virtpath + user).mkdir(parents=True, exist_ok=True)
|
||||
except Exception as e:
|
||||
self.respond(f'551 Error in create virtual user path: {e}')
|
||||
else:
|
||||
try:
|
||||
# Add the user to the authorizer
|
||||
self.authorizer.add_user(str(user),
|
||||
hash, cfg.virtpath + "/" + user, perm=cfg.defperm)
|
||||
# Save the user to the database
|
||||
# Define the database connection
|
||||
try:
|
||||
conn = connetti_db(cfg)
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error: {e}")
|
||||
logger.error(f'{e}')
|
||||
|
||||
# Create a cursor
|
||||
cur = conn.cursor()
|
||||
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbusertable} (ftpuser, hash, virtpath, perm) VALUES ('{user}', '{hash}', '{cfg.virtpath + user}', '{cfg.defperm}')")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logger.info(f"User {user} created.")
|
||||
self.respond('200 SITE ADDU successful.')
|
||||
except Exception as e:
|
||||
self.respond(f'501 SITE ADDU failed: {e}.')
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_DISU(self: object, line: str) -> None:
|
||||
"""
|
||||
Removes a virtual user from the authorizer and marks them as deleted in the database.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username to be disabled.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
parms = line.split()
|
||||
user = os.path.basename(parms[0]) # Extract the username
|
||||
try:
|
||||
# Remove the user from the authorizer
|
||||
self.authorizer.remove_user(str(user))
|
||||
# Delete the user from database
|
||||
try:
|
||||
conn = connetti_db(cfg)
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error: {e}")
|
||||
logger.error(f'{e}')
|
||||
|
||||
# Crea un cursore
|
||||
cur = conn.cursor()
|
||||
cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = now() WHERE ftpuser = '{user}'")
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.info(f"User {user} deleted.")
|
||||
self.respond('200 SITE DISU successful.')
|
||||
except Exception as e:
|
||||
self.respond('501 SITE DISU failed.')
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_ENAU(self: object, line: str) -> None:
|
||||
"""
|
||||
Restores a virtual user by updating their status in the database and adding them back to the authorizer.
|
||||
|
||||
Args:
|
||||
line (str): A string containing the username to be enabled.
|
||||
"""
|
||||
cfg = self.cfg
|
||||
parms = line.split()
|
||||
user = os.path.basename(parms[0]) # Extract the username
|
||||
try:
|
||||
# Restore the user into database
|
||||
try:
|
||||
conn = connetti_db(cfg)
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error: {e}")
|
||||
logger.error(f'{e}')
|
||||
|
||||
# Crea un cursore
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
cur.execute(f"UPDATE {cfg.dbname}.{cfg.dbusertable} SET disabled_at = null WHERE ftpuser = '{user}'")
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Update DB failed: {e}")
|
||||
|
||||
cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbname}.{cfg.dbusertable} WHERE ftpuser = '{user}'")
|
||||
|
||||
ftpuser, hash, virtpath, perm = cur.fetchone()
|
||||
self.authorizer.add_user(ftpuser, hash, virtpath, perm)
|
||||
try:
|
||||
Path(cfg.virtpath + ftpuser).mkdir(parents=True, exist_ok=True)
|
||||
except Exception as e:
|
||||
self.responde(f'551 Error in create virtual user path: {e}')
|
||||
|
||||
conn.close()
|
||||
|
||||
logger.info(f"User {user} restored.")
|
||||
self.respond('200 SITE ENAU successful.')
|
||||
|
||||
except Exception as e:
|
||||
self.respond('501 SITE ENAU failed.')
|
||||
print(e)
|
||||
|
||||
def ftp_SITE_LSTU(self: object, line: str) -> None:
|
||||
"""
|
||||
Lists all virtual users from the database.
|
||||
|
||||
Args:
|
||||
line (str): An empty string (no arguments needed for this command).
|
||||
"""
|
||||
cfg = self.cfg
|
||||
users_list = []
|
||||
try:
|
||||
# Connect to the SQLite database to fetch users
|
||||
try:
|
||||
conn = connetti_db(cfg)
|
||||
except mysql.connector.Error as e:
|
||||
print(f"Error: {e}")
|
||||
logger.error(f'{e}')
|
||||
|
||||
# Crea un cursore
|
||||
cur = conn.cursor()
|
||||
self.push("214-The following virtual users are defined:\r\n")
|
||||
cur.execute(f'SELECT ftpuser, perm, disabled_at FROM {cfg.dbname}.{cfg.dbusertable}')
|
||||
[users_list.append(f'Username: {ftpuser}\tPerms: {perm}\tDisabled: {disabled_at}\r\n') for ftpuser, perm, disabled_at in cur.fetchall()]
|
||||
self.push(''.join(users_list))
|
||||
self.respond("214 LSTU SITE command successful.")
|
||||
|
||||
except Exception as e:
|
||||
self.respond(f'501 list users failed: {e}')
|
||||
104
src/utils/orchestrator_utils.py
Normal file
104
src/utils/orchestrator_utils.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import logging
|
||||
import asyncio
|
||||
import os
|
||||
import aiomysql
|
||||
import contextvars
|
||||
from typing import Callable, Coroutine, Any
|
||||
|
||||
# Crea una context variable per identificare il worker
|
||||
worker_context = contextvars.ContextVar("worker_id", default="^-^")
|
||||
|
||||
|
||||
# Formatter personalizzato che include il worker_id
|
||||
class WorkerFormatter(logging.Formatter):
|
||||
"""Formatter personalizzato per i log che include l'ID del worker."""
|
||||
|
||||
def format(self, record: logging.LogRecord) -> str:
|
||||
"""Formatta il record di log includendo l'ID del worker.
|
||||
|
||||
Args:
|
||||
record (str): Il record di log da formattare.
|
||||
|
||||
Returns:
|
||||
La stringa formattata del record di log.
|
||||
"""
|
||||
record.worker_id = worker_context.get()
|
||||
return super().format(record)
|
||||
|
||||
|
||||
def setup_logging(log_filename: str, log_level_str: str):
|
||||
"""Configura il logging globale.
|
||||
|
||||
Args:
|
||||
log_filename (str): Percorso del file di log.
|
||||
log_level_str (str): Livello di log (es. "INFO", "DEBUG").
|
||||
"""
|
||||
logger = logging.getLogger()
|
||||
handler = logging.FileHandler(log_filename)
|
||||
formatter = WorkerFormatter(
|
||||
"%(asctime)s - PID: %(process)d.Worker-%(worker_id)s.%(name)s.%(funcName)s.%(levelname)s: %(message)s"
|
||||
)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
# Rimuovi eventuali handler esistenti e aggiungi il nostro
|
||||
if logger.hasHandlers():
|
||||
logger.handlers.clear()
|
||||
logger.addHandler(handler)
|
||||
log_level = getattr(logging, log_level_str.upper(), logging.INFO)
|
||||
logger.setLevel(log_level)
|
||||
logger.info("Logging configurato correttamente")
|
||||
|
||||
|
||||
async def run_orchestrator(
|
||||
config_class: Any,
|
||||
worker_coro: Callable[[int, Any, Any], Coroutine[Any, Any, None]],
|
||||
):
|
||||
"""Funzione principale che inizializza e avvia un orchestratore.
|
||||
|
||||
Args:
|
||||
config_class: La classe di configurazione da istanziare.
|
||||
worker_coro: La coroutine del worker da eseguire in parallelo.
|
||||
"""
|
||||
logger = logging.getLogger()
|
||||
logger.info("Avvio del sistema...")
|
||||
|
||||
cfg = config_class()
|
||||
logger.info("Configurazione caricata correttamente")
|
||||
|
||||
debug_mode = False
|
||||
try:
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
setup_logging(cfg.logfilename, log_level)
|
||||
debug_mode = logger.getEffectiveLevel() == logging.DEBUG
|
||||
|
||||
logger.info(f"Avvio di {cfg.max_threads} worker concorrenti")
|
||||
|
||||
pool = await aiomysql.create_pool(
|
||||
host=cfg.dbhost,
|
||||
user=cfg.dbuser,
|
||||
password=cfg.dbpass,
|
||||
db=cfg.dbname,
|
||||
minsize=cfg.max_threads,
|
||||
maxsize=cfg.max_threads * 4,
|
||||
pool_recycle=3600,
|
||||
)
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(worker_coro(i, cfg, pool))
|
||||
for i in range(cfg.max_threads)
|
||||
]
|
||||
|
||||
|
||||
logger.info("Sistema avviato correttamente. In attesa di nuovi task...")
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks, return_exceptions=debug_mode)
|
||||
finally:
|
||||
pool.close()
|
||||
await pool.wait_closed()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Info: Shutdown richiesto... chiusura in corso")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Errore principale: {e}", exc_info=debug_mode)
|
||||
1
src/utils/parsers/__init__.py
Normal file
1
src/utils/parsers/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Parser delle centraline con le tipologie di unit e tool"""
|
||||
1
src/utils/parsers/by_name/__init__.py
Normal file
1
src/utils/parsers/by_name/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Parser delle centraline con nomi di unit e tool"""
|
||||
1
src/utils/parsers/by_type/__init__.py
Normal file
1
src/utils/parsers/by_type/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Parser delle centraline"""
|
||||
15
src/utils/parsers/by_type/cr1000x_cr1000x.py
Normal file
15
src/utils/parsers/by_type/cr1000x_cr1000x.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'cr1000x_cr1000x'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/d2w_d2w.py
Normal file
15
src/utils/parsers/by_type/d2w_d2w.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'd2w_d2w'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g201_g201.py
Normal file
15
src/utils/parsers/by_type/g201_g201.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g201_g201'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool,"channels")
|
||||
15
src/utils/parsers/by_type/g301_g301.py
Normal file
15
src/utils/parsers/by_type/g301_g301.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g301_g301'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g801_iptm.py
Normal file
15
src/utils/parsers/by_type/g801_iptm.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_iptm'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g801_loc.py
Normal file
15
src/utils/parsers/by_type/g801_loc.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
15
src/utils/parsers/by_type/g801_mums.py
Normal file
15
src/utils/parsers/by_type/g801_mums.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_mums'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g801_musa.py
Normal file
15
src/utils/parsers/by_type/g801_musa.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as musa_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_musa'.
|
||||
|
||||
Questa funzione è un wrapper per `musa_main_loader` e passa il tipo
|
||||
di elaborazione come "musa".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await musa_main_loader(cfg, id, pool, "musa")
|
||||
15
src/utils/parsers/by_type/g801_mux.py
Normal file
15
src/utils/parsers/by_type/g801_mux.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g801_mux'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool, "channels")
|
||||
15
src/utils/parsers/by_type/g802_dsas.py
Normal file
15
src/utils/parsers/by_type/g802_dsas.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_dsas'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g802_gd.py
Normal file
15
src/utils/parsers/by_type/g802_gd.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as gd_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_gd'.
|
||||
|
||||
Questa funzione è un wrapper per `gd_main_loader` e passa il tipo
|
||||
di elaborazione come "gd".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await gd_main_loader(cfg, id, pool, "gd")
|
||||
15
src/utils/parsers/by_type/g802_loc.py
Normal file
15
src/utils/parsers/by_type/g802_loc.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
15
src/utils/parsers/by_type/g802_modb.py
Normal file
15
src/utils/parsers/by_type/g802_modb.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_modb'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g802_mums.py
Normal file
15
src/utils/parsers/by_type/g802_mums.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_mums'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
15
src/utils/parsers/by_type/g802_mux.py
Normal file
15
src/utils/parsers/by_type/g802_mux.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as channels_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'g802_mux'.
|
||||
|
||||
Questa funzione è un wrapper per `channels_main_loader` e passa il tipo
|
||||
di elaborazione come "channels".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await channels_main_loader(cfg, id, pool, "channels")
|
||||
15
src/utils/parsers/by_type/gs1_gs1.py
Normal file
15
src/utils/parsers/by_type/gs1_gs1.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as tlp_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'gs1_gs1'.
|
||||
|
||||
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
|
||||
di elaborazione come "tlp".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await tlp_main_loader(cfg, id, pool, "tlp")
|
||||
15
src/utils/parsers/by_type/hortus_hortus.py
Normal file
15
src/utils/parsers/by_type/hortus_hortus.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as pipe_sep_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'hortus_hortus'.
|
||||
|
||||
Questa funzione è un wrapper per `pipe_sep_main_loader` e passa il tipo
|
||||
di elaborazione come "pipe_separator".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await pipe_sep_main_loader(cfg, id, pool, "pipe_separator")
|
||||
35
src/utils/parsers/by_type/isi_csv_log_vulink.py
Normal file
35
src/utils/parsers/by_type/isi_csv_log_vulink.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import subprocess
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
from utils.database.loader_action import DATA_LOADED, update_status, unlock
|
||||
from utils.csv.data_preparation import get_data
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
|
||||
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
|
||||
# Creare un file temporaneo
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
|
||||
temp_file.write(ToolData)
|
||||
temp_filename = temp_file.name
|
||||
|
||||
try:
|
||||
# Eseguire il programma con il file temporaneo
|
||||
result = await subprocess.run(['python3', 'old_script/TS_PiniScript.py', temp_filename], capture_output=True, text=True)
|
||||
print(result.stdout)
|
||||
print(result.stderr)
|
||||
finally:
|
||||
# Pulire il file temporaneo
|
||||
os.unlink(temp_filename)
|
||||
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result.stderr}")
|
||||
raise Exception(f"Errore nel programma: {result.stderr}")
|
||||
else:
|
||||
logger.info(f"Programma TS_PiniScript.py eseguito con successo: {result.stdout}")
|
||||
await update_status(cfg, id, DATA_LOADED, pool)
|
||||
await unlock(cfg, id, pool)
|
||||
15
src/utils/parsers/by_type/tlp_loc.py
Normal file
15
src/utils/parsers/by_type/tlp_loc.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as analog_dig_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'tlp_loc'.
|
||||
|
||||
Questa funzione è un wrapper per `analog_dig_main_loader` e passa il tipo
|
||||
di elaborazione come "analogic_digital".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await analog_dig_main_loader(cfg, id, pool, "analogic_digital")
|
||||
15
src/utils/parsers/by_type/tlp_tlp.py
Normal file
15
src/utils/parsers/by_type/tlp_tlp.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from utils.csv.loaders import main_loader as tlp_main_loader
|
||||
|
||||
async def main_loader(cfg: object, id: int, pool: object) -> None:
|
||||
"""
|
||||
Carica ed elabora i dati CSV specifici per il tipo 'tlp_tlp'.
|
||||
|
||||
Questa funzione è un wrapper per `tlp_main_loader` e passa il tipo
|
||||
di elaborazione come "tlp".
|
||||
|
||||
Args:
|
||||
cfg (object): L'oggetto di configurazione.
|
||||
id (int): L'ID del record CSV da elaborare.
|
||||
pool (object): Il pool di connessioni al database.
|
||||
"""
|
||||
await tlp_main_loader(cfg, id, pool, "tlp")
|
||||
0
src/utils/timestamp/__init__.py
Normal file
0
src/utils/timestamp/__init__.py
Normal file
37
src/utils/timestamp/date_check.py
Normal file
37
src/utils/timestamp/date_check.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from datetime import datetime
|
||||
|
||||
def normalizza_data(data_string: str)->str:
|
||||
"""
|
||||
Normalizza una stringa di data al formato YYYY-MM-DD, provando diversi formati di input.
|
||||
|
||||
Args:
|
||||
data_string (str): La stringa di data da normalizzare.
|
||||
|
||||
Returns:
|
||||
str: La data normalizzata nel formato YYYY-MM-DD,
|
||||
o None se la stringa non può essere interpretata come una data.
|
||||
"""
|
||||
formato_desiderato = "%Y-%m-%d"
|
||||
formati_input = ["%Y/%m/%d", "%Y-%m-%d", "%d-%m-%Y","%d/%m/%Y", ] # Ordine importante: prova prima il più probabile
|
||||
|
||||
for formato_input in formati_input:
|
||||
try:
|
||||
data_oggetto = datetime.strptime(data_string, formato_input)
|
||||
return data_oggetto.strftime(formato_desiderato)
|
||||
except ValueError:
|
||||
continue # Prova il formato successivo se quello attuale fallisce
|
||||
|
||||
return None # Se nessun formato ha avuto successo
|
||||
|
||||
def normalizza_orario(orario_str):
|
||||
try:
|
||||
# Prova prima con HH:MM:SS
|
||||
dt = datetime.strptime(orario_str, "%H:%M:%S")
|
||||
return dt.strftime("%H:%M:%S")
|
||||
except ValueError:
|
||||
try:
|
||||
# Se fallisce, prova con HH:MM
|
||||
dt = datetime.strptime(orario_str, "%H:%M")
|
||||
return dt.strftime("%H:%M:%S")
|
||||
except ValueError:
|
||||
return orario_str # Restituisce originale se non parsabile
|
||||
Reference in New Issue
Block a user