From afd028b794977613814ecc7b8fed8d9f77d4b30b Mon Sep 17 00:00:00 2001 From: Alessandro Battilani Date: Sun, 10 Nov 2024 15:58:19 +0100 Subject: [PATCH] prova1 --- transform_file.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/transform_file.py b/transform_file.py index af1ad09..b093b2a 100644 --- a/transform_file.py +++ b/transform_file.py @@ -1,7 +1,7 @@ -import pandas as pd import json import psycopg2 -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text +from sqlalchemy.dialects.postgresql import insert # Configura la connessione al database PostgreSQL engine = create_engine('postgresql://asepg:batt1l0@10.211.114.101:5432/asedb') @@ -41,6 +41,7 @@ for line in lines[7:]: nodes = parts[2:] node_list = [] + records = [] for i, node_data in enumerate(nodes, start=1): # Dividi ogni nodo in valori separati da ";" node_values = node_data.split(';') @@ -58,20 +59,34 @@ for line in lines[7:]: } node_list.append(node_dict) - # Crea il DataFrame con una colonna JSONB per i nodi - df = pd.DataFrame([{ - "event_timestamp": pd.to_datetime(timestamp), + # Prepara i dati per l'inserimento/aggiornamento + record = { "tipo_centralina": tipo_centralina, "unit": unit, "ip_centralina": ip_centralina, "path": path, "ip_gateway": ip_gateway, + "event_timestamp": timestamp, "battery_level": float(measurements.split(';')[0]), "temperature": float(measurements.split(';')[1]), "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON - }]) + } + records.append(record) - # Carica il DataFrame in una tabella PostgreSQL - df.to_sql('dataraw', engine, if_exists='append', index=False) +# Esegui l'upsert con un comando SQL +with engine.connect() as conn: + with conn.execution_options(isolation_level='AUTOCOMMIT'): + result = conn.execute(text(""" + INSERT INTO dataraw (tipo_centralina, unit, ip_centralina, path, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) + VALUES (:tipo_centralina, :unit, :ip_centralina, :path, :ip_gateway, :event_timestamp, :battery_level, :temperature, :nodes_jsonb) + ON CONFLICT ON CONSTRAINT dataraw_unique + DO UPDATE SET + path = EXCLUDED.path, + ip_gateway = EXCLUDED.ip_gateway, + battery_level = EXCLUDED.battery_level, + temperature = EXCLUDED.temperature, + nodes_jsonb = EXCLUDED.nodes_jsonb; + """), records) + print(result) print("Tutte le righe del file sono state caricate con successo nella tabella PostgreSQL!")