2311
This commit is contained in:
140
transform_file.py
Normal file
140
transform_file.py
Normal file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
import json
|
||||
import psycopg2
|
||||
|
||||
import logging
|
||||
|
||||
from sqlalchemy import create_engine, text
|
||||
from utils.time import timestamp_fmt as ts
|
||||
from utils.config import set_config as setting
|
||||
|
||||
def write_db(engine, records):
|
||||
with engine.connect() as conn:
|
||||
conn.execute(text("""
|
||||
INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb)
|
||||
VALUES
|
||||
""" + ",".join([
|
||||
f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)"
|
||||
for i in range(len(records))
|
||||
]) + """
|
||||
ON CONFLICT ON CONSTRAINT dataraw_unique
|
||||
DO UPDATE SET
|
||||
tipo_centralina = EXCLUDED.tipo_centralina,
|
||||
tipo_tool = EXCLUDED.tipo_tool,
|
||||
ip_centralina = EXCLUDED.ip_centralina,
|
||||
ip_gateway = EXCLUDED.ip_gateway,
|
||||
battery_level = EXCLUDED.battery_level,
|
||||
temperature = EXCLUDED.temperature,
|
||||
nodes_jsonb = EXCLUDED.nodes_jsonb;
|
||||
"""), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()})
|
||||
|
||||
conn.commit()
|
||||
|
||||
def elab_csv(engine, cfg):
|
||||
# Leggi il file intero e separa l'intestazione dal resto dei dati
|
||||
with open('DT0029_20241106044856.csv', 'r') as file:
|
||||
lines = file.readlines()
|
||||
|
||||
# Estrarre le informazioni dalle prime 7 righe
|
||||
if len(lines) >= cfg.header(G801):
|
||||
tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga
|
||||
nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga
|
||||
ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga
|
||||
ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga
|
||||
path_tool = lines[5].strip() # Path completo dalla sesta riga
|
||||
nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione
|
||||
tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool
|
||||
|
||||
else:
|
||||
|
||||
logging.info(f'Il file non contiene abbastanza righe per estrarre i dati richiesti.')
|
||||
raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.")
|
||||
|
||||
records = []
|
||||
# Elabora le righe dei dati a partire dalla riga 8 in poi
|
||||
for line in lines[7:]:
|
||||
# Rimuovi spazi bianchi o caratteri di nuova riga
|
||||
input_data = line.strip()
|
||||
|
||||
# Suddividi la stringa in sezioni usando ";|;" come separatore
|
||||
parts = input_data.split(';|;')
|
||||
|
||||
# Verifica che ci siano almeno tre parti (timestamp, misure e nodi)
|
||||
if len(parts) < 3:
|
||||
print(f"Riga non valida: {input_data}")
|
||||
continue
|
||||
|
||||
# Estrai la data/ora e le prime misurazioni
|
||||
timestamp = parts[0]
|
||||
measurements = parts[1]
|
||||
|
||||
# Estrai i valori di ciascun nodo e formatta i dati come JSON
|
||||
nodes = parts[2:]
|
||||
node_list = []
|
||||
for i, node_data in enumerate(nodes, start=1):
|
||||
node_dict = {"num": i}
|
||||
# Dividi ogni nodo in valori separati da ";"
|
||||
node_values = node_data.split(';')
|
||||
for j, value in enumerate(node_values, start=0):
|
||||
# Imposta i valori a -9999 se trovi "Dis."
|
||||
node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value)
|
||||
node_list.append(node_dict)
|
||||
|
||||
# Prepara i dati per l'inserimento/aggiornamento
|
||||
record = {
|
||||
"nome_unit": nome_unit.upper(),
|
||||
"tipo_centralina": tipo_centralina,
|
||||
"nome_tool": nome_tool.upper(),
|
||||
"tipo_tool": tipo_tool,
|
||||
"ip_centralina": ip_centralina,
|
||||
"ip_gateway": ip_gateway,
|
||||
"event_timestamp": timestamp,
|
||||
"battery_level": float(measurements.split(';')[0]),
|
||||
"temperature": float(measurements.split(';')[1]),
|
||||
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON
|
||||
}
|
||||
|
||||
records.append(record)
|
||||
|
||||
# Se abbiamo raggiunto 500 record, esegui l'inserimento in batch
|
||||
if len(records) >= 500:
|
||||
print("raggiunti 500 record scrivo sul db")
|
||||
write_db(engine, records)
|
||||
records = []
|
||||
|
||||
write_db(engine, records)
|
||||
|
||||
|
||||
def main():
|
||||
# Load the configuration settings
|
||||
cfg = setting.config()
|
||||
|
||||
try:
|
||||
# Configura la connessione al database PostgreSQL
|
||||
engine = create_engine(f'postgresql://{cfg.dbuser}:{cfg.dbpass}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbschema}')
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
format="%(asctime)s %(message)s",
|
||||
filename=cfg.logfilename,
|
||||
level=logging.INFO,
|
||||
)
|
||||
elab_csv(engine, cfg)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logging.info(
|
||||
"Info: {}.".format("Shutdown requested...exiting")
|
||||
)
|
||||
|
||||
except Exception:
|
||||
print(
|
||||
"{} - PID {:>5} >> Error: {}.".format(
|
||||
ts.timestamp("log"), os.getpid(), sys.exc_info()[1]
|
||||
)
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user