Files
cvs_ase/transform_file.py
2024-11-10 15:58:19 +01:00

93 lines
3.6 KiB
Python

import json
import psycopg2
from sqlalchemy import create_engine, text
from sqlalchemy.dialects.postgresql import insert
# Configura la connessione al database PostgreSQL
engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb')
# Leggi il file intero e separa l'intestazione dal resto dei dati
with open('DT0029_20241106044856.csv', 'r') as file:
lines = file.readlines()
# Estrarre le informazioni dalle prime 7 righe
if len(lines) >= 7:
tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga
unit = lines[1].split()[1] # Seconda stringa nella seconda riga
ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga
ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga
path = lines[5].split()[2] # Path dalla quinta riga
else:
raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.")
# Elabora le righe dei dati a partire dalla riga 8 in poi
for line in lines[7:]:
# Rimuovi spazi bianchi o caratteri di nuova riga
input_data = line.strip()
# Suddividi la stringa in sezioni usando ";|;" come separatore
parts = input_data.split(';|;')
# Verifica che ci siano almeno tre parti (timestamp, misure e nodi)
if len(parts) < 3:
print(f"Riga non valida: {input_data}")
continue
# Estrai la data/ora e le prime misurazioni
timestamp = parts[0]
measurements = parts[1]
# Estrai i valori di ciascun nodo e formatta i dati come JSON
nodes = parts[2:]
node_list = []
records = []
for i, node_data in enumerate(nodes, start=1):
# Dividi ogni nodo in valori separati da ";"
node_values = node_data.split(';')
# Imposta i valori a -1 se trovi "Dis.", altrimenti convertili in float
val1 = -9999 if node_values[0] == "Dis." else float(node_values[0])
val2 = -9999 if len(node_values) > 1 and node_values[1] == "Dis." else (
float(node_values[1]) if len(node_values) > 1 else None
)
node_dict = {
"num": i,
"val1": val1,
"val2": val2
}
node_list.append(node_dict)
# Prepara i dati per l'inserimento/aggiornamento
record = {
"tipo_centralina": tipo_centralina,
"unit": unit,
"ip_centralina": ip_centralina,
"path": path,
"ip_gateway": ip_gateway,
"event_timestamp": timestamp,
"battery_level": float(measurements.split(';')[0]),
"temperature": float(measurements.split(';')[1]),
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON
}
records.append(record)
# Esegui l'upsert con un comando SQL
with engine.connect() as conn:
with conn.execution_options(isolation_level='AUTOCOMMIT'):
result = conn.execute(text("""
INSERT INTO dataraw (tipo_centralina, unit, ip_centralina, path, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb)
VALUES (:tipo_centralina, :unit, :ip_centralina, :path, :ip_gateway, :event_timestamp, :battery_level, :temperature, :nodes_jsonb)
ON CONFLICT ON CONSTRAINT dataraw_unique
DO UPDATE SET
path = EXCLUDED.path,
ip_gateway = EXCLUDED.ip_gateway,
battery_level = EXCLUDED.battery_level,
temperature = EXCLUDED.temperature,
nodes_jsonb = EXCLUDED.nodes_jsonb;
"""), records)
print(result)
print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")