import json import psycopg2 from sqlalchemy import create_engine, text # Configura la connessione al database PostgreSQL engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') def write_db(engine, records): with engine.connect() as conn: conn.execute(text(""" INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) VALUES """ + ",".join([ f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" for i in range(len(records)) ]) + """ ON CONFLICT ON CONSTRAINT dataraw_unique DO UPDATE SET tipo_centralina = EXCLUDED.tipo_centralina, tipo_tool = EXCLUDED.tipo_tool, ip_centralina = EXCLUDED.ip_centralina, ip_gateway = EXCLUDED.ip_gateway, battery_level = EXCLUDED.battery_level, temperature = EXCLUDED.temperature, nodes_jsonb = EXCLUDED.nodes_jsonb; """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) conn.commit() # Leggi il file intero e separa l'intestazione dal resto dei dati with open('DT0029_20241106044856.csv', 'r') as file: lines = file.readlines() # Estrarre le informazioni dalle prime 7 righe if len(lines) >= 7: tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga path_tool = lines[5].strip() # Path completo dalla sesta riga nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool else: raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") records = [] # Elabora le righe dei dati a partire dalla riga 8 in poi for line in lines[7:]: # Rimuovi spazi bianchi o caratteri di nuova riga input_data = line.strip() # Suddividi la stringa in sezioni usando ";|;" come separatore parts = input_data.split(';|;') # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) if len(parts) < 3: print(f"Riga non valida: {input_data}") continue # Estrai la data/ora e le prime misurazioni timestamp = parts[0] measurements = parts[1] # Estrai i valori di ciascun nodo e formatta i dati come JSON nodes = parts[2:] node_list = [] for i, node_data in enumerate(nodes, start=1): node_dict = {"num": i} # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') for j, value in enumerate(node_values, start=0): # Imposta i valori a -9999 se trovi "Dis." node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) node_list.append(node_dict) # Prepara i dati per l'inserimento/aggiornamento record = { "nome_unit": nome_unit.upper(), "tipo_centralina": tipo_centralina, "nome_tool": nome_tool.upper(), "tipo_tool": tipo_tool, "ip_centralina": ip_centralina, "ip_gateway": ip_gateway, "event_timestamp": timestamp, "battery_level": float(measurements.split(';')[0]), "temperature": float(measurements.split(';')[1]), "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON } records.append(record) # Se abbiamo raggiunto 1000 record, esegui l'inserimento in batch if len(records) >= 500: print("raggiunti 500 record scrivo sul db") write_db(engine, records) records = [] write_db(engine, records) print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")