This commit is contained in:
2025-08-02 19:22:48 +02:00
parent 6ff97316dc
commit fdefd0a430
17 changed files with 633 additions and 173 deletions

View File

@@ -46,11 +46,29 @@ class Config:
# unit setting
self.units_name = [part for part in c.get("unit", "Names").split('|')]
self.units_type = [part for part in c.get("unit", "Types").split('|')]
self.units_alias = {
key: value
for item in c.get("unit", "Alias").split('|')
for key, value in [item.split(':', 1)]
}
#self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
# tool setting
self.tools_name = [part for part in c.get("tool", "Names").split('|')]
self.tools_type = [part for part in c.get("tool", "Types").split('|')]
self.tools_alias = {
key: value
for item in c.get("tool", "Alias").split('|')
for key, value in [item.split(':', 1)]
}
# csv info
self.csv_infos = [part for part in c.get("csv", "Infos").split('|')]
# TS pini path match
self.ts_pini_path_match = {
key: key[1:-1] if value == '=' else value
for item in c.get("ts_pini", "path_match").split('|')
for key, value in [item.split(':', 1)]
}

View File

@@ -23,10 +23,10 @@ async def get_data(cfg: object, id: int, pool: object) -> tuple:
"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
unit_name, tool_name, tool_data = await cur.fetchone()
await cur.execute(f'select filename, unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}')
filename, unit_name, tool_name, tool_data = await cur.fetchone()
return unit_name, tool_name, tool_data
return filename, unit_name, tool_name, tool_data
async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
"""
@@ -39,7 +39,7 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines()
matrice_valori = []
"""
@@ -78,7 +78,7 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
@@ -112,7 +112,7 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
@@ -140,7 +140,7 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool)
righe = ToolData.splitlines()
matrice_valori = []
@@ -173,7 +173,7 @@ async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines()
valori_x_nodo = 2
matrice_valori = []
@@ -200,7 +200,7 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
Returns:
list: A list of lists, where each inner list represents a row in the matrix.
"""
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
righe = ToolData.splitlines()
matrice_valori = []
pattern = r';-?\d+dB$'

View File

@@ -1,10 +1,10 @@
import re
def extract_value(patterns: list, primary_source: str, secondary_source: str, default='Not Defined') -> str:
def extract_value(patterns: list, primary_source: str, secondary_source: str = None, default='Not Defined') -> str:
for source in (primary_source, secondary_source):
for source in [source for source in (primary_source, secondary_source) if source is not None]:
for pattern in patterns:
matches = re.findall(pattern, source, re.IGNORECASE)
if matches:
return matches[0] # Return the first match immediately
return default # Return default if no matches are found
return default # Return default if no matches are found

View File

@@ -1,6 +1,6 @@
import os
import logging
import re
import mysql.connector
from utils.database.connection import connetti_db
@@ -30,17 +30,38 @@ def on_file_received(self: object, file: str) -> None:
unit_type = extract_value(cfg.units_type, filename, str(lines[0:10]))
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:10]))
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:10]))
tool_info = "{}"
unit_type = cfg.units_alias.get(unit_type.upper(), unit_type)
try:
conn = connetti_db(cfg)
except mysql.connector.Error as e:
print(f"Error: {e}")
logger.error(f'{e}')
# Create a cursor
cur = conn.cursor()
# da estrarre in un modulo
if (unit_type.upper() == "ISI CSV LOG" and tool_type.upper() == "VULINK" ):
serial_number = filename.split('_')[0]
tool_info = f'{{"serial_number": {serial_number}}}'
try:
cur.execute(f"SELECT unit_name, tool_name FROM {cfg.dbname}.vulink_tools WHERE serial_number = '{serial_number}'")
unit_name, tool_name = cur.fetchone()
except Exception as e:
logger.warning(f'{tool_type} serial number {serial_number} not found in table vulink_tools. {e}')
# da estrarre in un modulo
if (unit_type.upper() == "STAZIONETOTALE" and tool_type.upper() == "INTEGRITY MONITOR" ):
escaped_keys = [re.escape(key) for key in cfg.ts_pini_path_match.keys()]
stazione = extract_value(escaped_keys, filename)
if stazione:
tool_info = f'{{"Stazione": "{cfg.ts_pini_path_match.get(stazione)}"}}'
try:
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines)))
cur.execute(f"INSERT INTO {cfg.dbname}.{cfg.dbrectable} (filename, unit_name, unit_type, tool_name, tool_type, tool_data, tool_info) VALUES (%s, %s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), ''.join(lines), tool_info))
conn.commit()
conn.close()

View File

@@ -0,0 +1,45 @@
import asyncio
import tempfile
import os
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -1,8 +1,9 @@
import subprocess
import asyncio
import tempfile
import os
from utils.database.loader_action import DATA_LOADED, update_status, unlock
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
@@ -11,25 +12,34 @@ logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as temp_file:
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Eseguire il programma con il file temporaneo
result = await subprocess.run(['python3', 'old_script/TS_PiniScript.py', temp_filename], capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/vulinkScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if result.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result.stderr}")
raise Exception(f"Errore nel programma: {result.stderr}")
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma vulinkScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info(f"Programma TS_PiniScript.py eseguito con successo: {result.stdout}")
await update_status(cfg, id, DATA_LOADED, pool)
logger.info("Programma vulinkScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -0,0 +1,45 @@
import asyncio
import tempfile
import os
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -0,0 +1,45 @@
import asyncio
import tempfile
import os
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -0,0 +1,45 @@
import asyncio
import tempfile
import os
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)

View File

@@ -0,0 +1,45 @@
import asyncio
import tempfile
import os
from utils.database import WorkflowFlags
from utils.database.loader_action import update_status, unlock
from utils.csv.data_preparation import get_data
import logging
logger = logging.getLogger(__name__)
async def main_loader(cfg: object, id: int, pool: object) -> None:
filename, UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool)
# Creare un file temporaneo
with tempfile.NamedTemporaryFile(mode='w', prefix= filename, suffix='.csv', delete=False) as temp_file:
temp_file.write(ToolData)
temp_filename = temp_file.name
try:
# Usa asyncio.subprocess per vero async
process = await asyncio.create_subprocess_exec(
'python3', 'old_script/TS_PiniScript.py', temp_filename,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result_stdout = stdout.decode('utf-8')
result_stderr = stderr.decode('utf-8')
finally:
# Pulire il file temporaneo
os.unlink(temp_filename)
if process.returncode != 0:
logger.error(f"Errore nell'esecuzione del programma TS_PiniScript.py: {result_stderr}")
raise Exception(f"Errore nel programma: {result_stderr}")
else:
logger.info("Programma TS_PiniScript.py eseguito con successo.")
logger.debug(f"Stdout: {result_stdout}")
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await update_status(cfg, id, WorkflowFlags.DATA_ELABORATED, pool)
await unlock(cfg, id, pool)