diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index 3a4f9ad..f70579f 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -8,7 +8,6 @@ import re import logging import psycopg2 -from psycopg2 import sql from hashlib import sha256 from pathlib import Path @@ -102,22 +101,23 @@ class ASEHandler(FTPHandler): with open(file, 'r') as csvfile: lines = csvfile.readlines() - units_type = cfg.headers.keys() - unit_type = extract_value(units_type, filename, str(lines[0:6])) - unit_name = extract_value(cfg.units_name, filename, str(lines[0:6])) - tool_name = extract_value(cfg.tools_name, filename, str(lines[0:6])) - tool_type = extract_value(cfg.tools_type, filename, str(lines[0:6])) + unit_name = extract_value(cfg.units_name, filename, str(lines[0:9])) + unit_type = extract_value(cfg.units_type, filename, str(lines[0:9])) + tool_name = extract_value(cfg.tools_name, filename, str(lines[0:9])) + tool_type = extract_value(cfg.tools_type, filename, str(lines[0:9])) conn = conn_db(cfg) # Crea un cursore cur = conn.cursor() try: - cur.execute("INSERT INTO received (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s,%s,%s,%s,%s,%s)" , (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), lines)) + cur.execute(f"INSERT INTO {cfg.dbschema}.{cfg.dbrectable } (filename, unit_name, unit_type, tool_name, tool_type, tool_data) VALUES (%s, %s, %s, %s, %s, %s)", (filename, unit_name.upper(), unit_type.upper(), tool_name.upper(), tool_type.upper(), lines)) conn.commit() conn.close() - except: + + except Exception as e: logging.error(f'File {file} not loaded. Held in user path.') + logging.error(f'{e}') else: os.remove(file) logging.info(f'File {file} loaded: removed.') @@ -161,8 +161,9 @@ class ASEHandler(FTPHandler): conn.close() logging.info("User {} created.".format(user)) self.respond('200 SITE ADDU successful.') - except: + except Exception as e: self.respond('501 SITE ADDU failed.') + print(e) def ftp_SITE_DELU(self, line): """ diff --git a/dbddl/dataraw.sql b/dbddl/dataraw.sql new file mode 100644 index 0000000..b486800 --- /dev/null +++ b/dbddl/dataraw.sql @@ -0,0 +1,18 @@ +DROP TABLE public.dataraw; + +CREATE TABLE public.dataraw +( + id serial4 NOT NULL, + unit_name text NULL, + unit_type text NULL, + tool_name text NULL, + tool_type text NULL, + ip_centralina text NULL, + ip_gateway text NULL, + event_timestamp timestamp NULL, + battery_level float8 NULL, + temperature float8 NULL, + nodes_jsonb text NULL, + CONSTRAINT dataraw_pk PRIMARY KEY (id), + CONSTRAINT dataraw_unique UNIQUE (unit_name, tool_name, event_timestamp) +); \ No newline at end of file diff --git a/dbddl/received.sql b/dbddl/received.sql index 47e2010..2c60faf 100644 --- a/dbddl/received.sql +++ b/dbddl/received.sql @@ -9,6 +9,7 @@ CREATE TABLE public.received tool_name text NULL, tool_type text NULL, tool_data text NULL, + "locked" int2 DEFAULT 0 NULL, status int2 DEFAULT 0 NULL, created_at timestamptz DEFAULT CURRENT_TIMESTAMP NULL, loaded_at timestamptz NULL, diff --git a/dbddl/virtusers.sql b/dbddl/virtusers.sql index 930da41..ae5b237 100644 --- a/dbddl/virtusers.sql +++ b/dbddl/virtusers.sql @@ -1,3 +1,5 @@ +DROP TABLE public.virtusers + CREATE TABLE public.virtusers ( id serial4 NOT NULL, diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 18559ad..7ccf9d4 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -30,9 +30,12 @@ recTableName = received [unit] - Headers = G801:7|G201:0|G301:0|G802:7|D2W:0|GFLOW:0|measurement:0|CR1000X:0 + Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 Names = ID[0-9]{4}|IX[0-9]{4} [tool] - Types = MUX|MUMS|MODB|IPTM|MUSA - Names = LOC[0-9]{4}|DT[0-9]{4} + Types = MUX|MUMS|MODB|IPTM|MUSA|LOC|GD|D2W|CR1000X|G301|NESA + Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement + +[csv] + Infos = IP|Subnet|Gateway|Web port|Ftp port diff --git a/transform_file.py b/transform_file.py old mode 100644 new mode 100755 index d71e885..3ea8870 --- a/transform_file.py +++ b/transform_file.py @@ -3,6 +3,8 @@ import sys import os +import re + import json import psycopg2 @@ -12,19 +14,30 @@ from sqlalchemy import create_engine, text from utils.time import timestamp_fmt as ts from utils.config import set_config as setting +def extract_value(patterns, source, default='Not Defined'): + ip = {} + for pattern in patterns: + pattern = f'r"{pattern}:\s*(\d{1,3}(?:\.\d{1,3}){3})"' + matches = re.search(pattern, source, re.IGNORECASE) + if matches: + ip.update({pattern: matches.group(1)}) + else: + ip.update({pattern: default}) + return ip + def write_db(engine, records): with engine.connect() as conn: conn.execute(text(""" - INSERT INTO dataraw (nome_unit, tipo_centralina, nome_tool, tipo_tool, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) + INSERT INTO dataraw (unit_name, unit_type, tool_name, tool_type, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) VALUES """ + ",".join([ - f"(:{i}_nome_unit, :{i}_tipo_centralina, :{i}_nome_tool, :{i}_tipo_tool, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" + f"(:{i}_unit_name, :{i}_unit_type, :{i}_tool_name, :{i}_tool_type, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" for i in range(len(records)) ]) + """ ON CONFLICT ON CONSTRAINT dataraw_unique DO UPDATE SET - tipo_centralina = EXCLUDED.tipo_centralina, - tipo_tool = EXCLUDED.tipo_tool, + unit_type = EXCLUDED.unit_type, + tool_type = EXCLUDED.tool_type, ip_centralina = EXCLUDED.ip_centralina, ip_gateway = EXCLUDED.ip_gateway, battery_level = EXCLUDED.battery_level, @@ -35,76 +48,71 @@ def write_db(engine, records): conn.commit() def elab_csv(engine, cfg): - # Leggi il file intero e separa l'intestazione dal resto dei dati - with open('DT0029_20241106044856.csv', 'r') as file: - lines = file.readlines() + with engine.connect() as conn: + cur = conn.cursor() + cur.execute(f'select unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbrectable } r ') - # Estrarre le informazioni dalle prime 7 righe - if len(lines) >= cfg.header('G801'): - tipo_centralina = lines[1].split()[0] # Prima stringa nella seconda riga - nome_unit = lines[1].split()[1] # Seconda stringa nella seconda riga - ip_centralina = lines[2].split()[1] # IP della centralina dalla terza riga - ip_gateway = lines[4].split()[1] # IP del gateway dalla quinta riga - path_tool = lines[5].strip() # Path completo dalla sesta riga - nome_tool = path_tool.split('/')[-1].replace('.csv', '') # Ultima parte del percorso senza estensione - tipo_tool = path_tool.split('/')[-2] # Parte precedente al nome_tool + unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() + data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') - else: - - logging.info(f'Il file non contiene abbastanza righe per estrarre i dati richiesti.') - raise ValueError("Il file non contiene abbastanza righe per estrarre i dati richiesti.") + # Estrarre le informazioni degli ip dalla header + infos = extract_value(cfg.csv_infos, data_list[:9]) records = [] # Elabora le righe dei dati a partire dalla riga 8 in poi - for line in lines[7:]: - # Rimuovi spazi bianchi o caratteri di nuova riga - input_data = line.strip() + for line in data_list: + if ";|;" in line: + # Rimuovi spazi bianchi o caratteri di nuova riga + input_data = line.strip().replace('\\n', '') - # Suddividi la stringa in sezioni usando ";|;" come separatore - parts = input_data.split(';|;') + # Suddividi la stringa in sezioni usando ";|;" come separatore + parts = input_data.split(';|;') - # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) - if len(parts) < 3: - print(f"Riga non valida: {input_data}") - continue + # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) + if len(parts) < 3: + print(f"Riga non valida: {input_data}") + continue - # Estrai la data/ora e le prime misurazioni - timestamp = parts[0] - measurements = parts[1] + # Estrai la data/ora e le prime misurazioni + timestamp = parts[0] + measurements = parts[1] - # Estrai i valori di ciascun nodo e formatta i dati come JSON - nodes = parts[2:] - node_list = [] - for i, node_data in enumerate(nodes, start=1): - node_dict = {"num": i} - # Dividi ogni nodo in valori separati da ";" - node_values = node_data.split(';') - for j, value in enumerate(node_values, start=0): - # Imposta i valori a -9999 se trovi "Dis." - node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) - node_list.append(node_dict) + # Estrai i valori di ciascun nodo e formatta i dati come JSON + nodes = parts[2:] + node_list = [] + for i, node_data in enumerate(nodes, start=1): + node_dict = {"num": i} + # Dividi ogni nodo in valori separati da ";" + node_values = node_data.split(';') + for j, value in enumerate(node_values, start=0): + # Imposta i valori a -9999 se trovi "Dis." + node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) + node_list.append(node_dict) - # Prepara i dati per l'inserimento/aggiornamento - record = { - "nome_unit": nome_unit.upper(), - "tipo_centralina": tipo_centralina, - "nome_tool": nome_tool.upper(), - "tipo_tool": tipo_tool, - "ip_centralina": ip_centralina, - "ip_gateway": ip_gateway, - "event_timestamp": timestamp, - "battery_level": float(measurements.split(';')[0]), - "temperature": float(measurements.split(';')[1]), - "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON - } + # Prepara i dati per l'inserimento/aggiornamento + record = { + "unit_name": unit_name.upper(), + "unit_type": unit_type.upper(), + "tool_name": tool_name.upper(), + "tool_type": tool_type.upper(), + "ip_centralina": infos['IP'], + "ip_subnet": infos['Subnet'], + "ip_gateway": infos['Gateway'], + "Web_port": infos['Web port'], + "Ftp_port": infos['Ftp port'], + "event_timestamp": timestamp, + "battery_level": float(measurements.split(';')[0]), + "temperature": float(measurements.split(';')[1]), + "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON + } - records.append(record) + records.append(record) - # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch - if len(records) >= 500: - print("raggiunti 500 record scrivo sul db") - write_db(engine, records) - records = [] + # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch + if len(records) >= 500: + print("raggiunti 500 record scrivo sul db") + write_db(engine, records) + records = [] write_db(engine, records) diff --git a/utils/config/set_config.py b/utils/config/set_config.py index e206728..cbfe2a9 100644 --- a/utils/config/set_config.py +++ b/utils/config/set_config.py @@ -38,9 +38,13 @@ class config: self.dbrectable = c.get("db", "recTableName") # unit setting - self.headers = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]} self.units_name = [part for part in c.get("unit", "Names").split('|')] + self.units_type = [part for part in c.get("unit", "Types").split('|')] + #self.units_header = {key: int(value) for pair in c.get("unit", "Headers").split('|') for key, value in [pair.split(':')]} # tool setting self.tools_name = [part for part in c.get("tool", "Names").split('|')] self.tools_type = [part for part in c.get("tool", "Types").split('|')] + + # csv info + self.csv_infos = [part for part in c.get("csv", "Infos").split('|')]