Files
ASE/utils/csv/data_preparation.py
2025-05-26 22:38:19 +02:00

133 lines
6.6 KiB
Python

#!.venv/bin/python
from utils.database.connection import connetti_db
from utils.database.nodes_query import get_nodes_type
import utils.timestamp.date_check as date_check
import logging
import re
from itertools import islice
logger = logging.getLogger(__name__)
def get_data(cfg: object, id: int) -> tuple:
"""
Retrieves data for a specific tool from the database.
This function connects to the database using the provided configuration,
executes a query to retrieve the unit name, tool name ID, and tool data
associated with the given ID from the raw data table, and returns the results.
Args:
cfg: A configuration object containing database connection parameters
and table names (cfg.dbname, cfg.dbrectable).
id: The ID of the tool record to retrieve.
Returns:
A tuple containing the unit name, tool name ID, and tool data.
"""
with connetti_db(cfg) as conn:
cur = conn.cursor()
cur.execute(f'select unit_name, tool_name, tool_data from {cfg.dbname}.{cfg.dbrectable} where id = {id}')
unit_name, tool_name, tool_data = cur.fetchone()
cur.close()
conn.close()
return unit_name, tool_name, tool_data
def make_pipe_sep_matrix(cfg: object, id: int) -> list:
"""
Processes raw tool data and transforms it into a matrix format for database insertion.
This function retrieves raw tool data using `get_data`, splits it into individual
readings (rows), and further parses each reading into individual values. It
handles data where multiple nodes might be reporting values within a single
reading. The resulting matrix is a list of lists, where each inner list
represents a row of data ready for insertion into the database. Missing
values are padded with `None` to ensure consistent row length.
Args:
cfg: A configuration object containing database connection parameters
and table names.
id: The ID of the tool record to process.
Returns:
A list of lists (matrix) representing the processed data. Each inner list
contains the following elements: UnitName, ToolNameID, NodeNum, EventDate,
EventTime, BatLevel, Temperature, followed by up to 16 additional
measurement values (Val0 to ValF), padded with None if necessary.
"""
UnitName, ToolNameID, ToolData = get_data(cfg, id)
righe = ToolData.splitlines()
matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga]:
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
EventDate, EventTime = timestamp.split(' ')
valori_nodi = rilevazioni.lstrip('|;').rstrip(';').split(';|;') # Toglie '|;' iniziali, toglie eventuali ';' finali, dividi per ';|;'
for num_nodo, valori_nodo in enumerate(valori_nodi, start=1):
valori = valori_nodo.split(';')
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori
def make_ain_din_matrix(cfg: object, id: int) -> list:
"""
Processes raw location (LOC) tool data and transforms it into a matrix format for database insertion.
This function retrieves raw LOC tool data using `get_data`, splits it into
individual readings (rows), and parses each reading into individual values
specific to the LOC data format (timestamp, battery level, temperature, and
four additional values: ain1, ain2, din1, din2). The resulting matrix is a list
of lists, where each inner list represents a row of data ready for insertion
into the database. Missing values are padded with `None` to ensure consistent
row length. It uses a regular expression to filter lines that match the
expected LOC data format.
Args:
cfg: A configuration object containing database connection parameters
and table names.
id: The ID of the tool record to process.
Returns:
A list of lists (matrix) representing the processed LOC data. Each inner
list contains data fields similar to `make_matrix`, adjusted for LOC data.
"""
UnitName, ToolNameID, ToolData = get_data(cfg, id)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines()
matrice_valori = []
pattern = r'^(?:\d{4}\/\d{2}\/\d{2}|\d{2}\/\d{2}\/\d{4}) \d{2}:\d{2}:\d{2}(?:;\d+\.\d+){2}(?:;\d+){4}$'
if node_ains or node_dins:
for riga in [riga for riga in righe if re.match(pattern, riga)]:
timestamp, batlevel, temperature, analog_input1, analog_input2, digital_input1, digital_input2 = riga.split(';')
EventDate, EventTime = timestamp.split(' ')
if any(node_ains):
for node_num, analog_act in enumerate([analog_input1, analog_input2], start=1):
matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [analog_act] + ([None] * (19 - 1)))
else:
logger.info(f"Nessun Ingresso analogico per {UnitName} {ToolNameID}")
if any(node_dins):
start_node = 3 if any(node_ains) else 1
for node_num, digital_act in enumerate([digital_input1, digital_input2], start=start_node):
matrice_valori.append([UnitName, ToolNameID, node_num, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + [digital_act] + ([None] * (19 - 1)))
else:
logger.info(f"Nessun Ingresso digitale per {UnitName} {ToolNameID}")
return matrice_valori
def make_channels_matrix(cfg: object, id: int) -> list:
UnitName, ToolNameID, ToolData = get_data(cfg, id)
node_channels, node_types, node_ains, node_dins = get_nodes_type(cfg, ToolNameID, UnitName)
righe = ToolData.splitlines()
matrice_valori = []
for riga in [riga for riga in righe if ';|;' in riga]:
timestamp, batlevel, temperature, rilevazioni = riga.split(';',3)
EventDate, EventTime = timestamp.split(' ')
valori_splitted = [valore for valore in rilevazioni.split(';') if valore != '|']
valori_iter = iter(valori_splitted)
valori_nodi = [list(islice(valori_iter, channels)) for channels in node_channels]
for num_nodo, valori in enumerate(valori_nodi, start=1):
matrice_valori.append([UnitName, ToolNameID, num_nodo, date_check.conforma_data(EventDate), EventTime, batlevel, temperature] + valori + ([None] * (19 - len(valori))))
return matrice_valori