#!.venv/bin/python from utils.database.nodes_query import get_nodes_type from utils.timestamp.date_check import normalizza_data, normalizza_orario from utils.database.loader_action import find_nearest_timestamp import logging import re from itertools import islice from datetime import datetime, timedelta logger = logging.getLogger(__name__) async def get_data(cfg: object, id: int, pool: object) -> tuple: """ Retrieves unit name, tool name, and tool data for a given record ID from the database. Args: cfg (object): Configuration object containing database table name. id (int): The ID of the record to retrieve. pool (object): The database connection pool. Returns: tuple: A tuple containing unit_name, tool_name, and tool_data. """ async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbrectable} where id = {id}') unit_name, tool_name, tool_data = await cur.fetchone() return unit_name, tool_name, tool_data async def make_pipe_sep_matrix(cfg: object, id: int, pool: object) -> list: """ Processes pipe-separated data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] """ Ciclo su tutte le righe del file CSV, escludendo quelle che: non hanno il pattern ';|;' perché non sono dati ma è la header che hanno il pattern 'No RX' perché sono letture non pervenute o in errore che hanno il pattern '.-' perché sono letture con un numero errato - negativo dopo la virgola che hanno il pattern 'File Creation' perché vuol dire che c'è stato un errore della centralina """ for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: timestamp, batlevel, temperature, rilevazioni = riga.split(';',3) EventDate, EventTime = timestamp.split(' ') if batlevel == '|': batlevel = temperature temperature, rilevazioni = rilevazioni.split(';',1) ''' in alcune letture mancano temperatura e livello batteria''' if temperature == '': temperature = 0 if batlevel == '': batlevel = 0 valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;' for num_nodo, valori_nodo in enumerate(valori_nodi, start=1): valori = valori_nodo.split(';') matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori async def make_ain_din_matrix(cfg: object, id: int, pool: object) -> list: """ Processes analog and digital input data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$' if node_ains or node_dins: for riga in [riga for riga in righe if re.match(pattern, riga)]: timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';') EventDate, EventTime = timestamp.split(' ') if any(node_ains): for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1): matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [analog_act] + ([None] * (19 - 1))) else: logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}") if any(node_dins): start_node = 3 if any(node_ains) else 1 for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node): matrice_valori.append([UnitName, ToolNameID, node_num, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + [digital_act] + ([None] * (19 - 1))) else: logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}") return matrice_valori async def make_channels_matrix(cfg: object, id: int, pool: object) -> list: """ Processes channel-based data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: timestamp, batlevel, temperature, rilevazioni = riga.replace(';|;',';').split(';',3) EventDate, EventTime = timestamp.split(' ') valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] valori_iter = iter(valori_splitted) valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori async def make_musa_matrix(cfg: object, id: int, pool: object) -> list: """ Processes 'Musa' specific data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) node_channels, node_types, node_ains, node_dins = await get_nodes_type(cfg, ToolNameID, UnitName, pool) righe = ToolData.splitlines() matrice_valori = [] for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: timestamp, batlevel, rilevazioni = riga.replace(';|;',';').split(';',2) if timestamp == '': continue EventDate, EventTime = timestamp.split(' ') temperature = rilevazioni.split(';')[0] logger.info(f'{temperature}, {rilevazioni}') valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|'] valori_iter = iter(valori_splitted) valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels] for num_nodo, valori in enumerate(valori_nodi, start=1): matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori async def make_tlp_matrix(cfg: object, id: int, pool: object) -> list: """ Processes 'TLP' specific data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() valori_x_nodo = 2 matrice_valori = [] for riga in righe: timestamp, batlevel, temperature, barometer, rilevazioni = riga.split(';',4) EventDate, EventTime = timestamp.split(' ') lista_rilevazioni = rilevazioni.strip(';').split(';') lista_rilevazioni.append(barometer) valori_nodi = [lista_rilevazioni[i:i + valori_x_nodo] for i in range(0, len(lista_rilevazioni), valori_x_nodo)] for num_nodo, valori in enumerate(valori_nodi, start=1): matrice_valori.append([UnitName, ToolNameID, num_nodo, normalizza_data(EventDate), normalizza_orario(EventTime), batlevel, temperature] + valori + ([None] * (19 - len(valori)))) return matrice_valori async def make_gd_matrix(cfg: object, id: int, pool: object) -> list: """ Processes 'GD' specific data from a CSV record into a structured matrix. Args: cfg (object): Configuration object. id (int): The ID of the CSV record. pool (object): The database connection pool. Returns: list: A list of lists, where each inner list represents a row in the matrix. """ UnitName, ToolNameID, ToolData = await get_data(cfg, id, pool) righe = ToolData.splitlines() matrice_valori = [] pattern = r';-?\d+dB$' for riga in [riga for riga in righe if ';|;' in riga and 'No RX' not in riga and '.-' not in riga and 'File Creation' not in riga and riga.isprintable()]: timestamp, rilevazioni = riga.split(';|;',1) EventDate, EventTime = timestamp.split(' ') #logger.debug(f"GD id {id}: {pattern} {rilevazioni}") if re.search(pattern, rilevazioni): if len(matrice_valori) == 0: matrice_valori.append(['RSSI']) batlevel, temperature, rssi = rilevazioni.split(';') #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {rssi}") gd_timestamp = datetime.strptime(f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "%Y-%m-%d %H:%M:%S") start_timestamp = gd_timestamp - timedelta(seconds=45) end_timestamp = gd_timestamp + timedelta(seconds=45) matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 1, f"{start_timestamp:%Y-%m-%d %H:%M:%S}", f"{end_timestamp:%Y-%m-%d %H:%M:%S}", f"{gd_timestamp:%Y-%m-%d %H:%M:%S}", batlevel, temperature, int(rssi[:-2])]) elif all(char == ';' for char in rilevazioni): pass elif ';|;' in rilevazioni: unit_metrics, data = rilevazioni.split(';|;') batlevel, temperature = unit_metrics.split(';') #logger.debug(f"GD id {id}: {EventDate}, {EventTime}, {batlevel}, {temperature}, {data}") dt_timestamp, dt_batlevel, dt_temperature = await find_nearest_timestamp(cfg, {"timestamp": f"{normalizza_data(EventDate)} {normalizza_orario(EventTime)}", "unit": UnitName, "tool": ToolNameID.replace("GD", "DT"), "node_num": 1}, pool) EventDate, EventTime = dt_timestamp.strftime('%Y-%m-%d %H:%M:%S').split(' ') valori = data.split(';') matrice_valori.append([UnitName, ToolNameID.replace("GD", "DT"), 2, EventDate, EventTime, float(dt_batlevel), float(dt_temperature)] + valori + ([None] * (16 - len(valori))) + [batlevel, temperature, None]) else: logger.warning(f"GD id {id}: dati non trattati - {rilevazioni}") return matrice_valori