diff --git a/transform_file.py b/transform_file.py index b7fc1a7..de002cd 100644 --- a/transform_file.py +++ b/transform_file.py @@ -1,11 +1,33 @@ import json import psycopg2 from sqlalchemy import create_engine, text -from sqlalchemy.dialects.postgresql import insert # Configura la connessione al database PostgreSQL engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') +def write_db(engine, records): + with engine.connect() as conn: + conn.execute(text(""" + INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) + VALUES + """ + ",".join([ + f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" + for i in range(len(records)) + ]) + """ + ON CONFLICT ON CONSTRAINT dataraw_unique + DO UPDATE SET + tipo_centralina = EXCLUDED.tipo_centralina, + tipo_tool = EXCLUDED.tipo_tool, + ip_centralina = EXCLUDED.ip_centralina, + ip_gateway = EXCLUDED.ip_gateway, + battery_level = EXCLUDED.battery_level, + temperature = EXCLUDED.temperature, + nodes_jsonb = EXCLUDED.nodes_jsonb; + """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) + + conn.commit() + + # Leggi il file intero e separa l'intestazione dal resto dei dati with open('DT0029_20241106044856.csv', 'r') as file: lines = file.readlines() @@ -13,10 +35,13 @@ with open('DT0029_20241106044856.csv', 'r') as file: # Estrarre le informazioni dalle prime 7 righe if len(lines) >= 7: tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga - unit = lines[1].split()[1] # Seconda stringa nella seconda riga + nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga - path = lines[5].split()[2] # Path dalla quinta riga + path_tool = lines[5].strip() # Path completo dalla sesta riga + nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione + tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool + else: raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") @@ -40,31 +65,23 @@ for line in lines[7:]: # Estrai i valori di ciascun nodo e formatta i dati come JSON nodes = parts[2:] - node_list = [] for i, node_data in enumerate(nodes, start=1): + node_dict = {"num": i} # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') - - # Imposta i valori a -1 se trovi "Dis.", altrimenti convertili in float - val1 = -9999 if node_values[0] == "Dis." else float(node_values[0]) - val2 = -9999 if len(node_values) > 1 and node_values[1] == "Dis." else ( - float(node_values[1]) if len(node_values) > 1 else None - ) - - node_dict = { - "num": i, - "val1": val1, - "val2": val2 - } + for j, value in enumerate(node_values, start=0): + # Imposta i valori a -9999 se trovi "Dis." + node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) node_list.append(node_dict) # Prepara i dati per l'inserimento/aggiornamento record = { + "nome_unit": nome_unit.upper(), "tipo_centralina": tipo_centralina, - "unit": unit, + "nome_tool": nome_tool.upper(), + "tipo_tool": tipo_tool, "ip_centralina": ip_centralina, - "path": path, "ip_gateway": ip_gateway, "event_timestamp": timestamp, "battery_level": float(measurements.split(';')[0]), @@ -74,23 +91,12 @@ for line in lines[7:]: records.append(record) -with engine.connect() as conn: - conn.execute(text(""" - INSERT INTO dataraw (tipo_centralina, unit, ip_centralina, path, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) - VALUES - """ + ",".join([ - f"(:{i}_tipo_centralina, :{i}_unit, :{i}_ip_centralina, :{i}_path, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" - for i in range(len(records)) - ]) + """ - ON CONFLICT ON CONSTRAINT dataraw_unique - DO UPDATE SET - path = EXCLUDED.path, - ip_gateway = EXCLUDED.ip_gateway, - battery_level = EXCLUDED.battery_level, - temperature = EXCLUDED.temperature, - nodes_jsonb = EXCLUDED.nodes_jsonb; - """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) + # Se abbiamo raggiunto 1000 record, esegui l'inserimento in batch + if len(records) >= 500: + print("raggiunti 500 record scrivo sul db") + write_db(engine, records) + records = [] - conn.commit() +write_db(engine, records) print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")