From d6f1998d780a6f9070d6c0da3b88f7edc76b387e Mon Sep 17 00:00:00 2001 From: alex Date: Sun, 27 Jul 2025 23:20:18 +0200 Subject: [PATCH] GD RSSI + normalizza orario --- utils/csv/data_preparation.py | 26 ++++++--- utils/csv/loaders.py | 2 +- utils/database/loader_action.py | 98 +++++++++++++++++++-------------- utils/timestamp/date_check.py | 23 ++++++-- 4 files changed, 94 insertions(+), 55 deletions(-) diff --git a/utils/csv/data_preparation.py b/utils/csv/data_preparation.py index 3d84b67..78af1b9 100644 --- a/utils/csv/data_preparation.py +++ b/utils/csv/data_preparation.py @@ -1,10 +1,11 @@ #!.venv/bin/python from utils.database.nodes_query import get_nodes_type -import utils.timestamp.date_check as date_check +from utils.timestamp.date_check import normalizza_data, normalizza_orario import logging import re from itertools import islice +from datetime import datetime, timedelta logger = logging.getLogger(__name__) @@ -61,7 +62,7 @@ async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list: valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): valori = valori_nodo.split(';') - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori @@ -87,13 +88,13 @@ async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list: EventDate, EventTime = timestamp.split(' ') if any(node_ains): for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): - matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) + matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) else: logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") if any(node_dins): start_node = 3 if any(node_ains) else 1 for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node): - matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) + matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) else: logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") @@ -123,7 +124,7 @@ async def make_channels_matrix(cfg: object, id: int, pool: object) -> list: valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori @@ -155,7 +156,7 @@ async def make_musa_matrix(cfg: object, id: int, pool: object) -> list: valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori @@ -182,7 +183,7 @@ async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list: lista_rilevazioni.append(barometer) valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] for num_nodo, valori in enumerate(valori_nodi, start=1): - matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori @@ -207,9 +208,16 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: EventDate, EventTime = timestamp.split(' ') #logger.debug(f"GD id {id}: {pattern} {rilevazioni}") if re.search(pattern, rilevazioni): + if len(matrice_valori) == 0: + matrice_valori.append(['RSSI']) batlevel, temperature, rssi = rilevazioni.split(';') #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") - matrice_valori.append([UnitName, ToolNameID, 1, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [rssi] + ([None] * 18)) + + gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S") + start_timestamp = gd_timestamp - timedelta(seconds=45) + end_timestamp = gd_timestamp + timedelta(seconds=45) + matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])]) + elif all(char == ';' for char in rilevazioni): pass elif ';|;' in rilevazioni: @@ -217,7 +225,7 @@ async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: batlevel, temperature = unit_metrics.split(';') #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") valori = data.split(';') - matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori)))) + matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) else: logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}") diff --git a/utils/csv/loaders.py b/utils/csv/loaders.py index 25fc7f7..83eab59 100644 --- a/utils/csv/loaders.py +++ b/utils/csv/loaders.py @@ -31,7 +31,7 @@ async def main_loader(cfg: object, id: int, pool: object, action: str) -> None: logger.info("matrice valori creata") # Load the data into the database - if await load_data(cfg, matrice_valori, pool): + if await load_data(cfg, matrice_valori, pool, type=action): await update_status(cfg, id, WorkflowFlags.DATA_LOADED, pool) await unlock(cfg, id, pool) else: diff --git a/utils/database/loader_action.py b/utils/database/loader_action.py index c640bf4..0a19d1a 100644 --- a/utils/database/loader_action.py +++ b/utils/database/loader_action.py @@ -6,7 +6,8 @@ from utils.database import FLAG_TO_TIMESTAMP, BATCH_SIZE logger = logging.getLogger(__name__) -async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: + +async def load_data(cfg: object, matrice_valori: list, pool: object, type: str) -> bool: """Carica una lista di record di dati grezzi nel database. Esegue un'operazione di inserimento massivo (executemany) per caricare i dati. @@ -17,6 +18,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: cfg (object): L'oggetto di configurazione contenente i nomi delle tabelle e i parametri di re-tentativo. matrice_valori (list): Una lista di tuple, dove ogni tupla rappresenta una riga da inserire. pool (object): Il pool di connessioni al database. + type (str): tipo di caricamento dati. Per GD fa l'update del tool DT corrispondente Returns: bool: True se il caricamento ha avuto successo, False altrimenti. @@ -24,44 +26,60 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: if not matrice_valori: logger.info("Nulla da caricare.") return True - sql_insert_RAWDATA = f""" - INSERT INTO {cfg.dbrawdata} ( - `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, - `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, - `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, - `BatLevelModule`,`TemperatureModule`, `RssiModule` - ) - VALUES ( - %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s - ) as new_data - ON DUPLICATE KEY UPDATE - `BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`), - `Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`), - `Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0 AND new_data.`Val0` IS NOT NULL, new_data.Val0, {cfg.dbrawdata}.`Val0`), - `Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1 AND new_data.`Val1` IS NOT NULL, new_data.Val1, {cfg.dbrawdata}.`Val1`), - `Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2 AND new_data.`Val2` IS NOT NULL, new_data.Val2, {cfg.dbrawdata}.`Val2`), - `Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3 AND new_data.`Val3` IS NOT NULL, new_data.Val3, {cfg.dbrawdata}.`Val3`), - `Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4 AND new_data.`Val4` IS NOT NULL, new_data.Val4, {cfg.dbrawdata}.`Val4`), - `Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5 AND new_data.`Val5` IS NOT NULL, new_data.Val5, {cfg.dbrawdata}.`Val5`), - `Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6 AND new_data.`Val6` IS NOT NULL, new_data.Val6, {cfg.dbrawdata}.`Val6`), - `Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7 AND new_data.`Val7` IS NOT NULL, new_data.Val7, {cfg.dbrawdata}.`Val7`), - `Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8 AND new_data.`Val8` IS NOT NULL, new_data.Val8, {cfg.dbrawdata}.`Val8`), - `Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9 AND new_data.`Val9` IS NOT NULL, new_data.Val9, {cfg.dbrawdata}.`Val9`), - `ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA AND new_data.`ValA` IS NOT NULL, new_data.ValA, {cfg.dbrawdata}.`ValA`), - `ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB AND new_data.`ValB` IS NOT NULL, new_data.ValB, {cfg.dbrawdata}.`ValB`), - `ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC AND new_data.`ValC` IS NOT NULL, new_data.ValC, {cfg.dbrawdata}.`ValC`), - `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`), - `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`), - `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`), - `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`), - `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), - `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), - `Created_at` = NOW() - """ - #logger.info(f"Query insert: {sql_insert_RAWDATA}.") + + if type == "gd" and matrice_valori[0][0] == "RSSI": + matrice_valori.pop(0) + sql_load_RAWDATA = f""" + UPDATE {cfg.dbrawdata} t1 + JOIN ( + SELECT id + FROM {cfg.dbrawdata} + WHERE UnitName = %s AND ToolNameID = %s AND NodeNum = %s + AND TIMESTAMP(`EventDate`, `EventTime`) BETWEEN %s AND %s + ORDER BY ABS(TIMESTAMPDIFF(SECOND, TIMESTAMP(`EventDate`, `EventTime`), %s)) + LIMIT 1 + ) t2 ON t1.id = t2.id + SET t1.BatLevelModule = %s, t1.TemperatureModule = %s, t1.RssiModule = %s + """ + else: + sql_load_RAWDATA = f""" + INSERT INTO {cfg.dbrawdata} ( + `UnitName`,`ToolNameID`,`NodeNum`,`EventDate`,`EventTime`,`BatLevel`,`Temperature`, + `Val0`,`Val1`,`Val2`,`Val3`,`Val4`,`Val5`,`Val6`,`Val7`, + `Val8`,`Val9`,`ValA`,`ValB`,`ValC`,`ValD`,`ValE`,`ValF`, + `BatLevelModule`,`TemperatureModule`, `RssiModule` + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s, %s, %s, %s, %s, %s, + %s, %s, %s + ) as new_data + ON DUPLICATE KEY UPDATE + `BatLevel` = IF({cfg.dbrawdata}.`BatLevel` != new_data.`BatLevel`, new_data.`BatLevel`, {cfg.dbrawdata}.`BatLevel`), + `Temperature` = IF({cfg.dbrawdata}.`Temperature` != new_data.Temperature, new_data.Temperature, {cfg.dbrawdata}.`Temperature`), + `Val0` = IF({cfg.dbrawdata}.`Val0` != new_data.Val0 AND new_data.`Val0` IS NOT NULL, new_data.Val0, {cfg.dbrawdata}.`Val0`), + `Val1` = IF({cfg.dbrawdata}.`Val1` != new_data.Val1 AND new_data.`Val1` IS NOT NULL, new_data.Val1, {cfg.dbrawdata}.`Val1`), + `Val2` = IF({cfg.dbrawdata}.`Val2` != new_data.Val2 AND new_data.`Val2` IS NOT NULL, new_data.Val2, {cfg.dbrawdata}.`Val2`), + `Val3` = IF({cfg.dbrawdata}.`Val3` != new_data.Val3 AND new_data.`Val3` IS NOT NULL, new_data.Val3, {cfg.dbrawdata}.`Val3`), + `Val4` = IF({cfg.dbrawdata}.`Val4` != new_data.Val4 AND new_data.`Val4` IS NOT NULL, new_data.Val4, {cfg.dbrawdata}.`Val4`), + `Val5` = IF({cfg.dbrawdata}.`Val5` != new_data.Val5 AND new_data.`Val5` IS NOT NULL, new_data.Val5, {cfg.dbrawdata}.`Val5`), + `Val6` = IF({cfg.dbrawdata}.`Val6` != new_data.Val6 AND new_data.`Val6` IS NOT NULL, new_data.Val6, {cfg.dbrawdata}.`Val6`), + `Val7` = IF({cfg.dbrawdata}.`Val7` != new_data.Val7 AND new_data.`Val7` IS NOT NULL, new_data.Val7, {cfg.dbrawdata}.`Val7`), + `Val8` = IF({cfg.dbrawdata}.`Val8` != new_data.Val8 AND new_data.`Val8` IS NOT NULL, new_data.Val8, {cfg.dbrawdata}.`Val8`), + `Val9` = IF({cfg.dbrawdata}.`Val9` != new_data.Val9 AND new_data.`Val9` IS NOT NULL, new_data.Val9, {cfg.dbrawdata}.`Val9`), + `ValA` = IF({cfg.dbrawdata}.`ValA` != new_data.ValA AND new_data.`ValA` IS NOT NULL, new_data.ValA, {cfg.dbrawdata}.`ValA`), + `ValB` = IF({cfg.dbrawdata}.`ValB` != new_data.ValB AND new_data.`ValB` IS NOT NULL, new_data.ValB, {cfg.dbrawdata}.`ValB`), + `ValC` = IF({cfg.dbrawdata}.`ValC` != new_data.ValC AND new_data.`ValC` IS NOT NULL, new_data.ValC, {cfg.dbrawdata}.`ValC`), + `ValD` = IF({cfg.dbrawdata}.`ValD` != new_data.ValD AND new_data.`ValD` IS NOT NULL, new_data.ValD, {cfg.dbrawdata}.`ValD`), + `ValE` = IF({cfg.dbrawdata}.`ValE` != new_data.ValE AND new_data.`ValE` IS NOT NULL, new_data.ValE, {cfg.dbrawdata}.`ValE`), + `ValF` = IF({cfg.dbrawdata}.`ValF` != new_data.ValF AND new_data.`ValF` IS NOT NULL, new_data.ValF, {cfg.dbrawdata}.`ValF`), + `BatLevelModule` = IF({cfg.dbrawdata}.`BatLevelModule` != new_data.BatLevelModule, new_data.BatLevelModule, {cfg.dbrawdata}.`BatLevelModule`), + `TemperatureModule` = IF({cfg.dbrawdata}.`TemperatureModule` != new_data.TemperatureModule, new_data.TemperatureModule, {cfg.dbrawdata}.`TemperatureModule`), + `RssiModule` = IF({cfg.dbrawdata}.`RssiModule` != new_data.RssiModule, new_data.RssiModule, {cfg.dbrawdata}.`RssiModule`), + `Created_at` = NOW() + """ + #logger.info(f"Query insert: {sql_load_RAWDATA}.") #logger.info(f"Matrice valori da inserire: {matrice_valori}.") rc = False async with pool.acquire() as conn: @@ -73,7 +91,7 @@ async def load_data(cfg: object, matrice_valori: list, pool: object) -> bool: for i in range(0, len(matrice_valori), BATCH_SIZE): batch = matrice_valori[i:i + BATCH_SIZE] - await cur.executemany(sql_insert_RAWDATA, batch) + await cur.executemany(sql_load_RAWDATA, batch) await conn.commit() logger.info(f"Completed batch {i//BATCH_SIZE + 1}/{(len(matrice_valori)-1)//BATCH_SIZE + 1}") diff --git a/utils/timestamp/date_check.py b/utils/timestamp/date_check.py index 67c128b..615f954 100644 --- a/utils/timestamp/date_check.py +++ b/utils/timestamp/date_check.py @@ -1,14 +1,14 @@ from datetime import datetime -def conforma_data(data_string: str)->str: +def normalizza_data(data_string: str)->str: """ - Conforma una stringa di data al formato YYYY-MM-DD, provando diversi formati di input. + Normalizza una stringa di data al formato YYYY-MM-DD, provando diversi formati di input. Args: - data_string (str): La stringa di data da conformare. + data_string (str): La stringa di data da normalizzare. Returns: - str: La data conformata nel formato YYYY-MM-DD, + str: La data normalizzata nel formato YYYY-MM-DD, o None se la stringa non può essere interpretata come una data. """ formato_desiderato = "%Y-%m-%d" @@ -21,4 +21,17 @@ def conforma_data(data_string: str)->str: except ValueError: continue # Prova il formato successivo se quello attuale fallisce - return None # Se nessun formato ha avuto successo \ No newline at end of file + return None # Se nessun formato ha avuto successo + +def normalizza_orario(orario_str): + try: + # Prova prima con HH:MM:SS + dt = datetime.strptime(orario_str, "%H:%M:%S") + return dt.strftime("%H:%M:%S") + except ValueError: + try: + # Se fallisce, prova con HH:MM + dt = datetime.strptime(orario_str, "%H:%M") + return dt.strftime("%H:%M:%S") + except ValueError: + return orario_str # Restituisce originale se non parsabile \ No newline at end of file