GD RSSI + normalizza orario

This commit is contained in:
2025-07-27 23:20:18 +02:00
parent dc20713cad
commit d6f1998d78
4 changed files with 94 additions and 55 deletions

View File

@@ -1,10 +1,11 @@
#!.venv/bin/python #!.venv/bin/python
from utils.database.nodes_query import get_nodes_type from utils.database.nodes_query import get_nodes_type
import utils.timestamp.date_check as date_check from utils.timestamp.date_check import normalizza_data, normalizza_orario
import logging import logging
import re import re
from itertools import islice from itertools import islice
from datetime import datetime, timedelta
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -61,7 +62,7 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list:
valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): for num_nodo, valori_nodo in enumerate(valori_nodi, start=1):
valori = valori_nodo.split(';') valori = valori_nodo.split(';')
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori return matrice_valori
@@ -87,13 +88,13 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list:
EventDate, EventTime = timestamp.split(' ') EventDate, EventTime = timestamp.split(' ')
if any(node_ains): if any(node_ains):
for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1):
matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1)))
else: else:
logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}")
if any(node_dins): if any(node_dins):
start_node = 3 if any(node_ains) else 1 start_node = 3 if any(node_ains) else 1
for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node): for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node):
matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1)))
else: else:
logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}")
@@ -123,7 +124,7 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list:
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1): for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori return matrice_valori
@@ -155,7 +156,7 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list:
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1): for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori return matrice_valori
@@ -182,7 +183,7 @@ async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list:
lista_rilevazioni.append(barometer) lista_rilevazioni.append(barometer)
valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)]
for num_nodo, valori in enumerate(valori_nodi, start=1): for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori return matrice_valori
@@ -207,9 +208,16 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
EventDate, EventTime = timestamp.split(' ') EventDate, EventTime = timestamp.split(' ')
#logger.debug(f"GD id {id}: {pattern} {rilevazioni}") #logger.debug(f"GD id {id}: {pattern} {rilevazioni}")
if re.search(pattern, rilevazioni): if re.search(pattern, rilevazioni):
if len(matrice_valori) == 0:
matrice_valori.append(['RSSI'])
batlevel, temperature, rssi = rilevazioni.split(';') batlevel, temperature, rssi = rilevazioni.split(';')
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}")
matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [rssi] + ([None] * 18))
gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S")
start_timestamp = gd_timestamp - timedelta(seconds=45)
end_timestamp = gd_timestamp + timedelta(seconds=45)
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])])
elif all(char == ';' for char in rilevazioni): elif all(char == ';' for char in rilevazioni):
pass pass
elif ';|;' in rilevazioni: elif ';|;' in rilevazioni:
@@ -217,7 +225,7 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list:
batlevel, temperature = unit_metrics.split(';') batlevel, temperature = unit_metrics.split(';')
#logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}")
valori = data.split(';') valori = data.split(';')
matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori))))
else: else:
logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}") logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}")

View File

@@ -31,7 +31,7 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None:
logger.info("matrice valori creata") logger.info("matrice valori creata")
# Load the data into the database # Load the data into the database
if await load_data(cfg, matrice_valori, pool): if await load_data(cfg, matrice_valori, pool, type=action):
await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool)
await unlock(cfg, id, pool) await unlock(cfg, id, pool)
else: else:

View File

@@ -6,7 +6,8 @@ from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) -> bool:
"""Carica una lista di record di dati grezzi nel database. """Carica una lista di record di dati grezzi nel database.
Esegue un'operazione di inserimento massivo (executemany) per caricare i dati. Esegue un'operazione di inserimento massivo (executemany) per caricare i dati.
@@ -17,6 +18,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo. cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo.
matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire. matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire.
pool (object): Il pool di connessioni al database. pool (object): Il pool di connessioni al database.
type (str): tipo di caricamento dati. Per GD fa l'update del tool DT corrispondente
Returns: Returns:
bool: True se il caricamento ha avuto successo, False altrimenti. bool: True se il caricamento ha avuto successo, False altrimenti.
@@ -24,7 +26,23 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
if not matrice_valori: if not matrice_valori:
logger.info("Nulla da caricare.") logger.info("Nulla da caricare.")
return True return True
sql_insert_RAWDATA = f"""
if type == "gd" and matrice_valori[0][0] == "RSSI":
matrice_valori.pop(0)
sql_load_RAWDATA = f"""
UPDATE {cfg.dbrawdata} t1
JOIN (
SELECT id
FROM {cfg.dbrawdata}
WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s
AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN %s AND %s
ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), %s))
LIMIT 1
) t2 ON t1.id = t2.id
SET t1.BatLevelModule = %s, t1.TemperatureModule = %s, t1.RssiModule = %s
"""
else:
sql_load_RAWDATA = f"""
INSERT INTO {cfg.dbrawdata} ( INSERT INTO {cfg.dbrawdata} (
`UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`,
`Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`,
@@ -61,7 +79,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
`RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`),
`Created_at` = NOW() `Created_at` = NOW()
""" """
#logger.info(f"Query insert: {sql_insert_RAWDATA}.") #logger.info(f"Query insert: {sql_load_RAWDATA}.")
#logger.info(f"Matrice valori da inserire: {matrice_valori}.") #logger.info(f"Matrice valori da inserire: {matrice_valori}.")
rc = False rc = False
async with pool.acquire() as conn: async with pool.acquire() as conn:
@@ -73,7 +91,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool:
for i in range(0, len(matrice_valori), BATCH_SIZE): for i in range(0, len(matrice_valori), BATCH_SIZE):
batch = matrice_valori[i:i + BATCH_SIZE] batch = matrice_valori[i:i + BATCH_SIZE]
await cur.executemany(sql_insert_RAWDATA, batch) await cur.executemany(sql_load_RAWDATA, batch)
await conn.commit() await conn.commit()
logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}") logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}")

View File

@@ -1,14 +1,14 @@
from datetime import datetime from datetime import datetime
def conforma_data(data_string: str)->str: def normalizza_data(data_string: str)->str:
""" """
Conforma una stringa di data al formato YYYY-MM-DD, provando diversi formati di input. Normalizza una stringa di data al formato YYYY-MM-DD, provando diversi formati di input.
Args: Args:
data_string (str): La stringa di data da conformare. data_string (str): La stringa di data da normalizzare.
Returns: Returns:
str: La data conformata nel formato YYYY-MM-DD, str: La data normalizzata nel formato YYYY-MM-DD,
o None se la stringa non può essere interpretata come una data. o None se la stringa non può essere interpretata come una data.
""" """
formato_desiderato = "%Y-%m-%d" formato_desiderato = "%Y-%m-%d"
@@ -22,3 +22,16 @@ def conforma_data(data_string: str)->str:
continue # Prova il formato successivo se quello attuale fallisce continue # Prova il formato successivo se quello attuale fallisce
return None # Se nessun formato ha avuto successo return None # Se nessun formato ha avuto successo
def normalizza_orario(orario_str):
try:
# Prova prima con HH:MM:SS
dt = datetime.strptime(orario_str, "%H:%M:%S")
return dt.strftime("%H:%M:%S")
except ValueError:
try:
# Se fallisce, prova con HH:MM
dt = datetime.strptime(orario_str, "%H:%M")
return dt.strftime("%H:%M:%S")
except ValueError:
return orario_str # Restituisce originale se non parsabile