This commit is contained in:
2025-05-11 10:01:23 +02:00
parent e9dc7c1192
commit 1dfb1a2efa
25 changed files with 231 additions and 94 deletions

14
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,14 @@
{
// Usare IntelliSense per informazioni sui possibili attributi.
// Al passaggio del mouse vengono visualizzate le descrizioni degli attributi esistenti.
// Per altre informazioni, visitare: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Python File",
"type": "debugpy",
"request": "launch",
"program": "${file}"
}
]
}

View File

@@ -12,5 +12,6 @@ CREATE TABLE `received` (
`status` int DEFAULT '0', `status` int DEFAULT '0',
`inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `inserted_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`loaded_at` datetime DEFAULT NULL, `loaded_at` datetime DEFAULT NULL,
`elaborated_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`) PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; ) ENGINE=InnoDB AUTO_INCREMENT=694 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

76
elab_orchestrator.py Executable file
View File

@@ -0,0 +1,76 @@
#!.venv/bin/python
# Import necessary libraries
import mysql.connector
import logging
import importlib
import time
import asyncio
import subprocess
# Import custom modules for configuration and database connection
from utils.config import loader as setting
from utils.database.connection import connetti_db
from utils.database.loader_action import DATA_LOADED, get_matlab_cmd
# Initialize the logger for this module
logger = logging.getLogger(__name__)
# Function to elaborate CSV data
async def run_matlab_elab(id: int, unit_name: str, unit_type: str, tool_name: str, tool_type: str, semaphore: asyncio.Semaphore) -> bool:
async with semaphore:
if get_matlab_cmd(cfg, unit_name, tool_name):
# If a record is found, lock it by updating the 'locked' field to 1
async def main():
# Load the configuration settings
cfg = setting.Config()
try:
# Configure logging to write log messages to a file with a specific format
logging.basicConfig(
format="%(asctime)s - PID: %(process)d.%(name)s.%(levelname)s: %(message)s ",
filename=cfg.logfilename,
level=logging.INFO,
)
# Limita il numero di esecuzioni concorrenti a max_threads
semaphore = asyncio.Semaphore(cfg.max_threads)
running_tasks = set()
# Enter an infinite loop to continuously process records
while True:
try:
# Establish a database connection
with connetti_db(cfg) as conn:
cur = conn.cursor()
# Select a single record from the raw data table that is not currently locked and has a status of 0
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {DATA_LOADED} limit 1')
id, unit_name, unit_type, tool_name, tool_type = cur.fetchone()
if id:
task = asyncio.create_task(run_matlab_elab(id, unit_name, unit_type, tool_name, tool_type, semaphore))
running_tasks.add(task)
# Rimuovi i task completati dal set
running_tasks = {t for t in running_tasks if not t.done()}
# If a record was successfully processed, log the number of threads currently running
#logger.info(f"Threads in execution: {len(threads)}")
except Exception as e:
logger.info(f"Error: {e}.")
except KeyboardInterrupt:
# Handle a keyboard interrupt (e.g., Ctrl+C) to gracefully shut down the program
logger.info("Info: Shutdown requested...exiting")
except Exception as e:
logger.info(f"Error: {e}.")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -27,18 +27,17 @@
user = root user = root
password = batt1l0 password = batt1l0
dbName = ase_lar dbName = ase_lar
dbSchema = public
userTableName = virtusers userTableName = virtusers
recTableName = received recTableName = received
rawTableName = RAWDATACOR rawTableName = RAWDATACOR
nodesTableName = nodes nodesTableName = nodes
[unit] [unit]
Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1|HORTUS
Names = ID[0-9]{4}|IX[0-9]{4} Names = ID[0-9]{4}|IX[0-9]{4}
[tool] [tool]
Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA|GS1|G201|TLP|DSAS|HORTUS
Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement
[csv] [csv]

View File

@@ -10,18 +10,19 @@ import threading
# Import custom modules for configuration and database connection # Import custom modules for configuration and database connection
from utils.config import loader as setting from utils.config import loader as setting
from utils.database.connection import connetti_db from utils.database.connection import connetti_db
from utils.database.loader_action import CSV_RECEIVED
# Initialize the logger for this module # Initialize the logger for this module
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Function to elaborate CSV data # Function to elaborate CSV data
def elab_csv(cfg: object, threads: list) -> bool: def load_csv(cfg: object, threads: list) -> bool:
try: try:
# Establish a database connection # Establish a database connection
with connetti_db(cfg) as conn: with connetti_db(cfg) as conn:
cur = conn.cursor() cur = conn.cursor()
# Select a single record from the raw data table that is not currently locked and has a status of 0 # Select a single record from the raw data table that is not currently locked and has a status of 0
cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = 0 limit 1') cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbname}.{cfg.dbrectable} where locked = 0 and status = {CSV_RECEIVED} limit 1')
id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
if id: if id:
# If a record is found, lock it by updating the 'locked' field to 1 # If a record is found, lock it by updating the 'locked' field to 1
@@ -79,7 +80,7 @@ def main():
# Remove it from the list of active threads # Remove it from the list of active threads
threads.remove(thread) threads.remove(thread)
# Attempt to process a CSV record # Attempt to process a CSV record
if elab_csv(cfg, threads): if load_csv(cfg, threads):
# If a record was successfully processed, log the number of threads currently running # If a record was successfully processed, log the number of threads currently running
logger.info(f"Threads in execution: {len(threads)}") logger.info(f"Threads in execution: {len(threads)}")
pass pass

View File

@@ -1,6 +1,6 @@
import re import re
def extract_value(patterns, primary_source, secondary_source, default='Not Defined'): def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str:
"""Extracts the first match for a list of patterns from the primary source. """Extracts the first match for a list of patterns from the primary source.
Falls back to the secondary source if no match is found. Falls back to the secondary source if no match is found.
""" """

View File

@@ -3,7 +3,7 @@ import mysql.connector
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def connetti_db(cfg): def connetti_db(cfg: object) -> object:
""" """
Establishes a connection to a MySQL database. Establishes a connection to a MySQL database.

View File

@@ -1,32 +0,0 @@
#!.venv/bin/python
from utils.database.connection import connetti_db
import logging
logger = logging.getLogger(__name__)
def load_data(cfg, matrice_valori):
sql_insert_RAWDATA = f'''
INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`,
`BatLevelModule`,`TemperatureModule`, `RssiModule`
)
VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s
)
'''
with connetti_db(cfg) as conn:
cur = conn.cursor()
try:
cur.executemany(sql_insert_RAWDATA, matrice_valori)
conn.commit()
except Exception as e:
conn.rollback()
print(f'Error: {e}')
finally:
cur.close()
conn.close()

View File

@@ -0,0 +1,63 @@
#!.venv/bin/python
from utils.database.connection import connetti_db
import logging
logger = logging.getLogger(__name__)
CSV_RECEIVED = 0
DATA_LOADED = 1
DATA_ELABORATED = 2
def load_data(cfg: object, matrice_valori: list) -> bool :
sql_insert_RAWDATA = f'''
INSERT IGNORE INTO {cfg.dbname}.{cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
`Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`,
`BatLevelModule`,`TemperatureModule`, `RssiModule`
)
VALUES (
%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s
)
'''
with connetti_db(cfg) as conn:
cur = conn.cursor()
try:
cur.executemany(sql_insert_RAWDATA, matrice_valori)
conn.commit()
logging.info("Data loaded.")
rc = True
except Exception as e:
conn.rollback()
logging.error(f"Error: {e}.")
rc = False
finally:
conn.close()
return rc
def update_status(cfg: object, id: int, status: int) -> None:
with connetti_db(cfg) as conn:
cur = conn.cursor()
try:
cur.execute(f'update {cfg.dbname}.{cfg.dbrectable} set locked = 0, status = {status} where id = {id}')
conn.commit()
except Exception as e:
conn.rollback()
logging.error(f'Error: {e}')
def get_matlab_cmd(cfg: object, unit: str, tool: str) -> tuple:
with connetti_db(cfg) as conn:
cur = conn.cursor()
try:
cur.execute(f'''select m.matcall, t.ftp_send , t.unit_id, s.`desc` as statustools, t.api_send, u.inoltro_api, u.inoltro_api_url, u.inoltro_api_bearer_token, IFNULL(u.duedate, "") as duedate
from matfuncs as m
inner join tools as t on t.matfunc = m.id
inner join units as u on u.id = t.unit_id
inner join statustools as s on t.statustool_id = s.id
where t.name = "{tool}" and u.name = "{unit}"''')
return cur.fetchone()
except Exception as e:
logging.error(f'Error: {e}')

View File

@@ -3,7 +3,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_nodes_type(cfg, tool, unit): def get_nodes_type(cfg: object, tool: str, unit: str) -> tuple:
with connetti_db(cfg) as conn: with connetti_db(cfg) as conn:
cur = conn.cursor(dictionary=True) cur = conn.cursor(dictionary=True)
@@ -16,7 +16,7 @@ def get_nodes_type(cfg, tool, unit):
WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}' WHERE y.type NOT IN ('Anchor Link', 'None') AND t.name = '{tool}' AND u.name = '{unit}'
ORDER BY n.num; ORDER BY n.num;
""" """
logger.info(f"{unit} - {tool}: Executing query: {query}") #logger.info(f"{unit} - {tool}: Executing query: {query}")
cur.execute(query) cur.execute(query)
results = cur.fetchall() results = cur.fetchall()
logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.") logger.info(f"{unit} - {tool}: {cur.rowcount} rows selected to get node type/Ain/Din/channels.")

View File

@@ -9,7 +9,7 @@ from utils.config.parser import extract_value
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def on_file_received(self, file): def on_file_received(self: object, file: str) -> None:
"""Handles the event when a file is successfully received. """Handles the event when a file is successfully received.
Args: Args:

View File

@@ -9,7 +9,7 @@ from utils.database.connection import connetti_db
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def ftp_SITE_ADDU(self, line): def ftp_SITE_ADDU(self: object, line: str) -> None:
"""Adds a virtual user, creates their directory, and saves their details to the database. """Adds a virtual user, creates their directory, and saves their details to the database.
""" """
cfg = self.cfg cfg = self.cfg
@@ -50,7 +50,7 @@ def ftp_SITE_ADDU(self, line):
self.respond(f'501 SITE ADDU failed: {e}.') self.respond(f'501 SITE ADDU failed: {e}.')
print(e) print(e)
def ftp_SITE_DISU(self, line): def ftp_SITE_DISU(self: object, line: str) -> None:
"""Removes a virtual user from the authorizer and marks them as deleted in the database.""" """Removes a virtual user from the authorizer and marks them as deleted in the database."""
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
@@ -77,7 +77,7 @@ def ftp_SITE_DISU(self, line):
self.respond('501 SITE DISU failed.') self.respond('501 SITE DISU failed.')
print(e) print(e)
def ftp_SITE_ENAU(self, line): def ftp_SITE_ENAU(self: object, line: str) -> None:
"""Restores a virtual user by updating their status in the database and adding them back to the authorizer.""" """Restores a virtual user by updating their status in the database and adding them back to the authorizer."""
cfg = self.cfg cfg = self.cfg
parms = line.split() parms = line.split()
@@ -116,7 +116,7 @@ def ftp_SITE_ENAU(self, line):
self.respond('501 SITE ENAU failed.') self.respond('501 SITE ENAU failed.')
print(e) print(e)
def ftp_SITE_LSTU(self, line): def ftp_SITE_LSTU(self: object, line: str) -> None:
"""Lists all virtual users from the database.""" """Lists all virtual users from the database."""
cfg = self.cfg cfg = self.cfg
users_list = [] users_list = []

View File

@@ -1,3 +1,3 @@
def chi_sono(unit, tool): def main_loader(unit, tool):
print(f'{__name__}: {unit} - {tool}') print(f'{__name__}: {unit} - {tool}')
return f'{__name__}: {unit} - {tool}' return f'{__name__}: {unit} - {tool}'

View File

@@ -4,6 +4,8 @@ import utils.timestamp.date_check as date_check
import logging import logging
import re import re
from itertools import islice
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_data(cfg: object, id: int) -> tuple: def get_data(cfg: object, id: int) -> tuple:
@@ -92,9 +94,27 @@ def make_loc_matrix(cfg: object, id: int) -> list:
matrice_valori = [] matrice_valori = []
pattern = r'(?:\d{4}/\d{2}/\d{2}|\d{2}/\d{2}/\d{4}) \d{2}:\d{2}:\d{2}(;[^;]+)+' pattern = r'(?:\d{4}/\d{2}/\d{2}|\d{2}/\d{2}/\d{4}) \d{2}:\d{2}:\d{2}(;[^;]+)+'
for riga in [riga for riga in righe if re.match(pattern, riga)]: for riga in [riga for riga in righe if re.match(pattern, riga)]:
timestamp, batlevel, temperature, ain1, ain2, din1, din2, = riga.split(';') timestamp, battery_voltage, unit_temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';')
EventDate, EventTime = timestamp.split(' ') event_date, event_time = timestamp.split(' ')
valori = [ain1, ain2, din1, din2] valori = [analog_input1, analog_input2, digital_input1, digital_input2]
matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(event_date), event_time, battery_voltage, unit_temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori return matrice_valori
def make_matrix_with_channels(cfg: object, id: int, node_channels: list) -> list:
UnitName, ToolNameID, ToolData = get_data(cfg, id)
righe = ToolData.splitlines()
matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga]:
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
EventDate, EventTime = timestamp.split(' ')
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
valori_iter = iter(valori_splitted)
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori

View File

@@ -1,3 +1,4 @@
def chi_sono(unit, tool): from .g801_mums import main_loader as g801_mums_main_loader
print(f'{__name__}: {unit} - {tool}')
return f'{__name__}: {unit} - {tool}' def main_loader(cfg: object, id: int) -> None:
return g801_mums_main_loader(cfg, id)

View File

@@ -1,5 +1,5 @@
#!.venv/bin/python #!.venv/bin/python
from utils.database.loader import load_data from utils.database.loader_action import load_data
from utils.parsers.data_preparation import make_loc_matrix from utils.parsers.data_preparation import make_loc_matrix
import logging import logging

View File

@@ -1,6 +1,6 @@
#!.venv/bin/python #!.venv/bin/python
# Import necessary modules # Import necessary modules
from utils.database.loader import load_data from utils.database.loader_action import load_data, update_status, DATA_LOADED
from utils.parsers.data_preparation import make_matrix from utils.parsers.data_preparation import make_matrix
import logging import logging
@@ -11,4 +11,5 @@ def main_loader(cfg: object, id: int) -> None:
# Create a matrix of values from the data # Create a matrix of values from the data
matrice_valori = make_matrix(cfg, id) matrice_valori = make_matrix(cfg, id)
# Load the data into the database # Load the data into the database
load_data(cfg, matrice_valori) if load_data(cfg, matrice_valori):
update_status(cfg, id, DATA_LOADED)

View File

@@ -1,5 +1,4 @@
import time from .g801_mums import main_loader as g801_mums_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return g801_mums_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -1,5 +1,4 @@
import time from .g801_loc import main_loader as g801_loc_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return g801_loc_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -1,5 +1,4 @@
import time from .g801_mums import main_loader as g801_mums_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return g801_mums_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -1,5 +1,4 @@
import time from .g801_mums import main_loader as g801_mums_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return g801_mums_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -1,5 +1,4 @@
import time from .g801_mux import main_loader as g801_mux_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return g801_mux_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -1,5 +1,4 @@
import time from .tlp_tlp import main_loader as tlp_tlp_main_loader
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}') def main_loader(cfg: object, id: int) -> None:
time.sleep(20) return tlp_tlp_main_loader(cfg, id)
return f'{__name__}: {unit} - {tool}'

View File

@@ -0,0 +1,4 @@
from .cr1000x_cr1000x import main_loader as cr1000x_cr1000x_main_loader
def main_loader(cfg: object, id: int) -> None:
return cr1000x_cr1000x_main_loader(cfg, id)

View File

@@ -1,5 +0,0 @@
import time
def chi_sono(unit, tool):
print(f'{__name__}: {unit} - {tool}')
time.sleep(20)
return f'{__name__}: {unit} - {tool}'