do first transformation

This commit is contained in:
2024-12-01 11:47:58 +01:00
parent 704bc20456
commit 8b8766c609
7 changed files with 112 additions and 75 deletions

View File

@@ -8,7 +8,6 @@ import re
import logging import logging
import psycopg2 import psycopg2
from psycopg2 import sql
from hashlib import sha256 from hashlib import sha256
from pathlib import Path from pathlib import Path
@@ -102,22 +101,23 @@ class ASEHandler(FTPHandler):
with open(file, 'r') as csvfile: with open(file, 'r') as csvfile:
lines = csvfile.readlines() lines = csvfile.readlines()
units_type = cfg.headers.keys() unit_name = extract_value(cfg.units_name, filename, str(lines[0:9]))
unit_type = extract_value(units_type, filename, str(lines[0:6])) unit_type = extract_value(cfg.units_type, filename, str(lines[0:9]))
unit_name = extract_value(cfg.units_name, filename, str(lines[0:6])) tool_name = extract_value(cfg.tools_name, filename, str(lines[0:9]))
tool_name = extract_value(cfg.tools_name, filename, str(lines[0:6])) tool_type = extract_value(cfg.tools_type, filename, str(lines[0:9]))
tool_type = extract_value(cfg.tools_type, filename, str(lines[0:6]))
conn = conn_db(cfg) conn = conn_db(cfg)
# Crea un cursore # Crea un cursore
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.execute("INSERT INTO received (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s,%s,%s,%s,%s,%s)" , (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), lines)) cur.execute(f"INSERT INTO {cfg.dbschema}.{cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), lines))
conn.commit() conn.commit()
conn.close() conn.close()
except:
except Exception as e:
logging.error(f'File {file} not loaded. Held in user path.') logging.error(f'File {file} not loaded. Held in user path.')
logging.error(f'{e}')
else: else:
os.remove(file) os.remove(file)
logging.info(f'File {file} loaded: removed.') logging.info(f'File {file} loaded: removed.')
@@ -161,8 +161,9 @@ class ASEHandler(FTPHandler):
conn.close() conn.close()
logging.info("User {} created.".format(user)) logging.info("User {} created.".format(user))
self.respond('200 SITE ADDU successful.') self.respond('200 SITE ADDU successful.')
except: except Exception as e:
self.respond('501 SITE ADDU failed.') self.respond('501 SITE ADDU failed.')
print(e)
def ftp_SITE_DELU(self, line): def ftp_SITE_DELU(self, line):
""" """

18
dbddl/dataraw.sql Normal file
View File

@@ -0,0 +1,18 @@
DROP TABLE public.dataraw;
CREATE TABLE public.dataraw
(
id serial4 NOT NULL,
unit_name text NULL,
unit_type text NULL,
tool_name text NULL,
tool_type text NULL,
ip_centralina text NULL,
ip_gateway text NULL,
event_timestamp timestamp NULL,
battery_level float8 NULL,
temperature float8 NULL,
nodes_jsonb text NULL,
CONSTRAINT dataraw_pk PRIMARY KEY (id),
CONSTRAINT dataraw_unique UNIQUE (unit_name, tool_name, event_timestamp)
);

View File

@@ -9,6 +9,7 @@ CREATE TABLE public.received
tool_name text NULL, tool_name text NULL,
tool_type text NULL, tool_type text NULL,
tool_data text NULL, tool_data text NULL,
"locked" int2 DEFAULT 0 NULL,
status int2 DEFAULT 0 NULL, status int2 DEFAULT 0 NULL,
created_at timestamptz DEFAULT CURRENT_TIMESTAMP NULL, created_at timestamptz DEFAULT CURRENT_TIMESTAMP NULL,
loaded_at timestamptz NULL, loaded_at timestamptz NULL,

View File

@@ -1,3 +1,5 @@
DROP TABLE public.virtusers
CREATE TABLE public.virtusers CREATE TABLE public.virtusers
( (
id serial4 NOT NULL, id serial4 NOT NULL,

View File

@@ -30,9 +30,12 @@
recTableName = received recTableName = received
[unit] [unit]
Headers = G801:7|G201:0|G301:0|G802:7|D2W:0|GFLOW:0|measurement:0|CR1000X:0 Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1
Names = ID[0-9]{4}|IX[0-9]{4} Names = ID[0-9]{4}|IX[0-9]{4}
[tool] [tool]
Types = MUX|MUMS|MODB|IPTM|MUSA Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA
Names = LOC[0-9]{4}|DT[0-9]{4} Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement
[csv]
Infos = IP|Subnet|Gateway|Web port|Ftp port

132
transform_file.py Normal file → Executable file
View File

@@ -3,6 +3,8 @@
import sys import sys
import os import os
import re
import json import json
import psycopg2 import psycopg2
@@ -12,19 +14,30 @@ from sqlalchemy import create_engine, text
from utils.time import timestamp_fmt as ts from utils.time import timestamp_fmt as ts
from utils.config import set_config as setting from utils.config import set_config as setting
def extract_value(patterns, source, default='Not Defined'):
ip = {}
for pattern in patterns:
pattern = f'r"{pattern}:\s*(\d{1,3}(?:\.\d{1,3}){3})"'
matches = re.search(pattern, source, re.IGNORECASE)
if matches:
ip.update({pattern: matches.group(1)})
else:
ip.update({pattern: default})
return ip
def write_db(engine, records): def write_db(engine, records):
with engine.connect() as conn: with engine.connect() as conn:
conn.execute(text(""" conn.execute(text("""
INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) INSERT INTO dataraw (unit_name, unit_type, tool_name, tool_type, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb)
VALUES VALUES
""" + ",".join([ """ + ",".join([
f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" f"(:{i}_unit_name, :{i}_unit_type, :{i}_tool_name, :{i}_tool_type, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)"
for i in range(len(records)) for i in range(len(records))
]) + """ ]) + """
ON CONFLICT ON CONSTRAINT dataraw_unique ON CONFLICT ON CONSTRAINT dataraw_unique
DO UPDATE SET DO UPDATE SET
tipo_centralina = EXCLUDED.tipo_centralina, unit_type = EXCLUDED.unit_type,
tipo_tool = EXCLUDED.tipo_tool, tool_type = EXCLUDED.tool_type,
ip_centralina = EXCLUDED.ip_centralina, ip_centralina = EXCLUDED.ip_centralina,
ip_gateway = EXCLUDED.ip_gateway, ip_gateway = EXCLUDED.ip_gateway,
battery_level = EXCLUDED.battery_level, battery_level = EXCLUDED.battery_level,
@@ -35,76 +48,71 @@ def write_db(engine, records):
conn.commit() conn.commit()
def elab_csv(engine, cfg): def elab_csv(engine, cfg):
# Leggi il file intero e separa l'intestazione dal resto dei dati with engine.connect() as conn:
with open('DT0029_20241106044856.csv', 'r') as file: cur = conn.cursor()
lines = file.readlines() cur.execute(f'select unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbrectable } r ')
# Estrarre le informazioni dalle prime 7 righe unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone()
if len(lines) >= cfg.header('G801'): data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","')
tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga
nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga
ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga
ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga
path_tool = lines[5].strip() # Path completo dalla sesta riga
nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione
tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool
else: # Estrarre le informazioni degli ip dalla header
infos = extract_value(cfg.csv_infos, data_list[:9])
logging.info(f'Il file non contiene abbastanza righe per estrarre i dati richiesti.')
raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.")
records = [] records = []
# Elabora le righe dei dati a partire dalla riga 8 in poi # Elabora le righe dei dati a partire dalla riga 8 in poi
for line in lines[7:]: for line in data_list:
# Rimuovi spazi bianchi o caratteri di nuova riga if ";|;" in line:
input_data = line.strip() # Rimuovi spazi bianchi o caratteri di nuova riga
input_data = line.strip().replace('\\n', '')
# Suddividi la stringa in sezioni usando ";|;" come separatore # Suddividi la stringa in sezioni usando ";|;" come separatore
parts = input_data.split(';|;') parts = input_data.split(';|;')
# Verifica che ci siano almeno tre parti (timestamp, misure e nodi) # Verifica che ci siano almeno tre parti (timestamp, misure e nodi)
if len(parts) < 3: if len(parts) < 3:
print(f"Riga non valida: {input_data}") print(f"Riga non valida: {input_data}")
continue continue
# Estrai la data/ora e le prime misurazioni # Estrai la data/ora e le prime misurazioni
timestamp = parts[0] timestamp = parts[0]
measurements = parts[1] measurements = parts[1]
# Estrai i valori di ciascun nodo e formatta i dati come JSON # Estrai i valori di ciascun nodo e formatta i dati come JSON
nodes = parts[2:] nodes = parts[2:]
node_list = [] node_list = []
for i, node_data in enumerate(nodes, start=1): for i, node_data in enumerate(nodes, start=1):
node_dict = {"num": i} node_dict = {"num": i}
# Dividi ogni nodo in valori separati da ";" # Dividi ogni nodo in valori separati da ";"
node_values = node_data.split(';') node_values = node_data.split(';')
for j, value in enumerate(node_values, start=0): for j, value in enumerate(node_values, start=0):
# Imposta i valori a -9999 se trovi "Dis." # Imposta i valori a -9999 se trovi "Dis."
node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value)
node_list.append(node_dict) node_list.append(node_dict)
# Prepara i dati per l'inserimento/aggiornamento # Prepara i dati per l'inserimento/aggiornamento
record = { record = {
"nome_unit": nome_unit.upper(), "unit_name": unit_name.upper(),
"tipo_centralina": tipo_centralina, "unit_type": unit_type.upper(),
"nome_tool": nome_tool.upper(), "tool_name": tool_name.upper(),
"tipo_tool": tipo_tool, "tool_type": tool_type.upper(),
"ip_centralina": ip_centralina, "ip_centralina": infos['IP'],
"ip_gateway": ip_gateway, "ip_subnet": infos['Subnet'],
"event_timestamp": timestamp, "ip_gateway": infos['Gateway'],
"battery_level": float(measurements.split(';')[0]), "Web_port": infos['Web port'],
"temperature": float(measurements.split(';')[1]), "Ftp_port": infos['Ftp port'],
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON "event_timestamp": timestamp,
} "battery_level": float(measurements.split(';')[0]),
"temperature": float(measurements.split(';')[1]),
"nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON
}
records.append(record) records.append(record)
# Se abbiamo raggiunto 500 record, esegui l'inserimento in batch # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch
if len(records) >= 500: if len(records) >= 500:
print("raggiunti 500 record scrivo sul db") print("raggiunti 500 record scrivo sul db")
write_db(engine, records) write_db(engine, records)
records = [] records = []
write_db(engine, records) write_db(engine, records)

View File

@@ -38,9 +38,13 @@ class config:
self.dbrectable = c.get("db", "recTableName") self.dbrectable = c.get("db", "recTableName")
# unit setting # unit setting
self.headers = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
self.units_name = [part for part in c.get("unit", "Names").split('|')] self.units_name = [part for part in c.get("unit", "Names").split('|')]
self.units_type = [part for part in c.get("unit", "Types").split('|')]
#self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]}
# tool setting # tool setting
self.tools_name = [part for part in c.get("tool", "Names").split('|')] self.tools_name = [part for part in c.get("tool", "Names").split('|')]
self.tools_type = [part for part in c.get("tool", "Types").split('|')] self.tools_type = [part for part in c.get("tool", "Types").split('|')]
# csv info
self.csv_infos = [part for part in c.get("csv", "Infos").split('|')]