import pandas as pd import json import psycopg2 from sqlalchemy import create_engine # Configura la connessione al database PostgreSQL engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') # Leggi il file intero e separa l'intestazione dal resto dei dati with open('DT0029_20241106044856.csv', 'r') as file: lines = file.readlines() # Estrarre le informazioni dalle prime 7 righe if len(lines) >= 7: tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga unit = lines[1].split()[1] # Seconda stringa nella seconda riga ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga path = lines[5].split()[2] # Path dalla quinta riga else: raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") # Elabora le righe dei dati a partire dalla riga 8 in poi for line in lines[7:]: # Rimuovi spazi bianchi o caratteri di nuova riga input_data = line.strip() # Suddividi la stringa in sezioni usando ";|;" come separatore parts = input_data.split(';|;') # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) if len(parts) < 3: print(f"Riga non valida: {input_data}") continue # Estrai la data/ora e le prime misurazioni timestamp = parts[0] measurements = parts[1] # Estrai i valori di ciascun nodo e formatta i dati come JSON nodes = parts[2:] node_list = [] for i, node_data in enumerate(nodes, start=1): # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') # Imposta i valori a -1 se trovi "Dis.", altrimenti convertili in float val1 = -9999 if node_values[0] == "Dis." else float(node_values[0]) val2 = -9999 if len(node_values) > 1 and node_values[1] == "Dis." else ( float(node_values[1]) if len(node_values) > 1 else None ) node_dict = { "num": i, "val1": val1, "val2": val2 } node_list.append(node_dict) # Crea il DataFrame con una colonna JSONB per i nodi df = pd.DataFrame([{ "event_timestamp": pd.to_datetime(timestamp), "tipo_centralina": tipo_centralina, "unit": unit, "ip_centralina": ip_centralina, "path": path, "ip_gateway": ip_gateway, "battery_level": float(measurements.split(';')[0]), "temperature": float(measurements.split(';')[1]), "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON }]) # Carica il DataFrame in una tabella PostgreSQL df.to_sql('dataraw', engine, if_exists='append', index=False) print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")