import json import psycopg2 from sqlalchemy import create_engine, text from sqlalchemy.dialects.postgresql import insert # Configura la connessione al database PostgreSQL engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') # Leggi il file intero e separa l'intestazione dal resto dei dati with open('DT0029_20241106044856.csv', 'r') as file: lines = file.readlines() # Estrarre le informazioni dalle prime 7 righe if len(lines) >= 7: tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga unit = lines[1].split()[1] # Seconda stringa nella seconda riga ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga path = lines[5].split()[2] # Path dalla quinta riga else: raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") records = [] # Elabora le righe dei dati a partire dalla riga 8 in poi for line in lines[7:]: # Rimuovi spazi bianchi o caratteri di nuova riga input_data = line.strip() # Suddividi la stringa in sezioni usando ";|;" come separatore parts = input_data.split(';|;') # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) if len(parts) < 3: print(f"Riga non valida: {input_data}") continue # Estrai la data/ora e le prime misurazioni timestamp = parts[0] measurements = parts[1] # Estrai i valori di ciascun nodo e formatta i dati come JSON nodes = parts[2:] node_list = [] for i, node_data in enumerate(nodes, start=1): # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') # Imposta i valori a -1 se trovi "Dis.", altrimenti convertili in float val1 = -9999 if node_values[0] == "Dis." else float(node_values[0]) val2 = -9999 if len(node_values) > 1 and node_values[1] == "Dis." else ( float(node_values[1]) if len(node_values) > 1 else None ) node_dict = { "num": i, "val1": val1, "val2": val2 } node_list.append(node_dict) # Prepara i dati per l'inserimento/aggiornamento record = { "tipo_centralina": tipo_centralina, "unit": unit, "ip_centralina": ip_centralina, "path": path, "ip_gateway": ip_gateway, "event_timestamp": timestamp, "battery_level": float(measurements.split(';')[0]), "temperature": float(measurements.split(';')[1]), "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON } records.append(record) with engine.connect() as conn: conn.execute(text(""" INSERT INTO dataraw (tipo_centralina, unit, ip_centralina, path, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) VALUES """ + ",".join([ f"(:{i}_tipo_centralina, :{i}_unit, :{i}_ip_centralina, :{i}_path, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" for i in range(len(records)) ]) + """ ON CONFLICT ON CONSTRAINT dataraw_unique DO UPDATE SET path = EXCLUDED.path, ip_gateway = EXCLUDED.ip_gateway, battery_level = EXCLUDED.battery_level, temperature = EXCLUDED.temperature, nodes_jsonb = EXCLUDED.nodes_jsonb; """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) conn.commit() print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")