diff --git a/FtpCsvReceiver.py b/FtpCsvReceiver.py index f70579f..63b5658 100755 --- a/FtpCsvReceiver.py +++ b/FtpCsvReceiver.py @@ -115,7 +115,7 @@ class ASEHandler(FTPHandler): conn.commit() conn.close() - except Exception as e: + except psycopg2.Error as e: logging.error(f'File {file} not loaded. Held in user path.') logging.error(f'{e}') else: @@ -161,7 +161,7 @@ class ASEHandler(FTPHandler): conn.close() logging.info("User {} created.".format(user)) self.respond('200 SITE ADDU successful.') - except Exception as e: + except psycopg2.Error as e: self.respond('501 SITE ADDU failed.') print(e) @@ -186,8 +186,7 @@ class ASEHandler(FTPHandler): logging.info("User {} deleted.".format(user)) self.respond('200 SITE DELU successful.') - - except Exception as e: + except psycopg2.Error as e: self.respond('501 SITE DELU failed.') print(e) @@ -204,8 +203,11 @@ class ASEHandler(FTPHandler): # Crea un cursore cur = conn.cursor() - cur.execute(f"UPDATE {cfg.dbschema}.{cfg.dbusertable} SET deleted_at = null WHERE ftpuser = '{user}'") - conn.commit() + try: + cur.execute(f"UPDATE {cfg.dbschema}.{cfg.dbusertable} SET deleted_at = null WHERE ftpuser = '{user}'") + conn.commit() + except psycopg2.Error as e: + logging.error("Update DB failed: {}".format(e)) cur.execute(f"SELECT ftpuser, hash, virtpath, perm FROM {cfg.dbschema}.{cfg.dbusertable} WHERE ftpuser = '{user}'") diff --git a/dbddl/dataraw.ddl b/dbddl/dataraw.ddl new file mode 100644 index 0000000..c504d54 --- /dev/null +++ b/dbddl/dataraw.ddl @@ -0,0 +1,38 @@ +CREATE TABLE public.dataraw +( + id serial4 NOT NULL, + unit_name text NULL, + unit_type text NULL, + tool_name text NULL, + tool_type text NULL, + unit_ip text NULL, + unit_subnet text NULL, + unit_gateway text NULL, + event_timestamp timestamp NULL, + battery_level float8 NULL, + temperature float8 NULL, + nodes_jsonb jsonb NULL, + created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL, + updated_at timestamp NULL, + CONSTRAINT dataraw_pk PRIMARY KEY (id), + CONSTRAINT dataraw_unique UNIQUE (unit_name, tool_name, event_timestamp) +); + + +CREATE OR REPLACE FUNCTION public.update_updated_at_column() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + NEW.updated_at = now(); +RETURN NEW; +END; +$function$ +; + + +CREATE TRIGGER update_updated_at BEFORE +UPDATE + ON dataraw FOR EACH ROW +EXECUTE PROCEDURE + update_updated_at_column(); \ No newline at end of file diff --git a/dbddl/dataraw.sql b/dbddl/dataraw.sql deleted file mode 100644 index b486800..0000000 --- a/dbddl/dataraw.sql +++ /dev/null @@ -1,18 +0,0 @@ -DROP TABLE public.dataraw; - -CREATE TABLE public.dataraw -( - id serial4 NOT NULL, - unit_name text NULL, - unit_type text NULL, - tool_name text NULL, - tool_type text NULL, - ip_centralina text NULL, - ip_gateway text NULL, - event_timestamp timestamp NULL, - battery_level float8 NULL, - temperature float8 NULL, - nodes_jsonb text NULL, - CONSTRAINT dataraw_pk PRIMARY KEY (id), - CONSTRAINT dataraw_unique UNIQUE (unit_name, tool_name, event_timestamp) -); \ No newline at end of file diff --git a/dbddl/received.sql b/dbddl/received.ddl similarity index 100% rename from dbddl/received.sql rename to dbddl/received.ddl diff --git a/dbddl/virtusers.sql b/dbddl/virtusers.ddl similarity index 100% rename from dbddl/virtusers.sql rename to dbddl/virtusers.ddl diff --git a/ftpcsvreceiver.ini b/ftpcsvreceiver.ini index 7ccf9d4..c76c73e 100644 --- a/ftpcsvreceiver.ini +++ b/ftpcsvreceiver.ini @@ -28,6 +28,7 @@ dbSchema = public userTableName = virtusers recTableName = received + rawTableName = dataraw [unit] Types = G801|G201|G301|G802|D2W|GFLOW|CR1000X|TLP|GS1 @@ -38,4 +39,4 @@ Names = LOC[0-9]{4}|DT[0-9]{4}|GD[0-9]{4}|[0-9]{18}|measurement [csv] - Infos = IP|Subnet|Gateway|Web port|Ftp port + Infos = IP|Subnet|Gateway diff --git a/mqtt_pub.py b/mqtt_pub.py new file mode 100755 index 0000000..f1a7d8e --- /dev/null +++ b/mqtt_pub.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import paho.mqtt.client as mqtt +import time +import ssl + +version = '5' # or '3' +mytransport = 'tcp' # or 'websockets' + +if version == '5': + mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, + client_id="myPy", + transport=mytransport, + protocol=mqtt.MQTTv5) +if version == '3': + mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, + client_id="myPy", + transport=mytransport, + protocol=mqtt.MQTTv311, + clean_session=True) + +mqttc.username_pw_set("alex", "BatManu#171017") + +'''client.tls_set(certfile=None, + keyfile=None, + cert_reqs=ssl.CERT_REQUIRED)''' + +def on_message(client, obj, message, properties=None): + print(" Received message " + str(message.payload) + + " on topic '" + message.topic + + "' with QoS " + str(message.qos)) +''' +def on_connect(client, obj, flags, reason_code, properties): + print("reason_code: " + str(reason_code)) + +def on_publish(client, obj, mid, reason_code, properties): + print("mid: " + str(mid)) + +def on_log(client, obj, level, string): + print(string) +''' + +mqttc.on_message = on_message; +''' +client.on_connect = mycallbacks.on_connect; +client.on_publish = mycallbacks.on_publish; +client.on_subscribe = mycallbacks.on_subscribe; +''' + +broker = 'mqtt' +myport = 1883 +if version == '5': + from paho.mqtt.properties import Properties + from paho.mqtt.packettypes import PacketTypes + properties=Properties(PacketTypes.CONNECT) + properties.SessionExpiryInterval=30*60 # in seconds + mqttc.connect(broker, + port=myport, + clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY, + properties=properties, + keepalive=60); + +elif version == '3': + mqttc.connect(broker,port=myport,keepalive=60); + +mqttc.loop_start(); \ No newline at end of file diff --git a/run_trans.sh b/run_trans.sh new file mode 100755 index 0000000..f12793e --- /dev/null +++ b/run_trans.sh @@ -0,0 +1,4 @@ +for (( i=1; i<=29000; i++ )) +do +./transform_file.py +done diff --git a/transform_file.py b/transform_file.py index 3ea8870..cab7577 100755 --- a/transform_file.py +++ b/transform_file.py @@ -4,117 +4,148 @@ import sys import os import re - +from datetime import datetime import json import psycopg2 +from psycopg2.extras import execute_values import logging -from sqlalchemy import create_engine, text +import psycopg2.sql + from utils.time import timestamp_fmt as ts from utils.config import set_config as setting +def conn_db(cfg): + return psycopg2.connect(dbname=cfg.dbname, user=cfg.dbuser, password=cfg.dbpass, host=cfg.dbhost, port=cfg.dbport ) + def extract_value(patterns, source, default='Not Defined'): ip = {} for pattern in patterns: - pattern = f'r"{pattern}:\s*(\d{1,3}(?:\.\d{1,3}){3})"' - matches = re.search(pattern, source, re.IGNORECASE) + s_pattern = rf'{pattern}:\s*(\d{{1,3}}(?:\.\d{{1,3}}){{3}})' + matches = re.search(s_pattern, source, re.IGNORECASE) if matches: ip.update({pattern: matches.group(1)}) else: ip.update({pattern: default}) return ip -def write_db(engine, records): - with engine.connect() as conn: - conn.execute(text(""" - INSERT INTO dataraw (unit_name, unit_type, tool_name, tool_type, ip_centralina, ip_gateway, event_timestamp, battery_level, temperature, nodes_jsonb) - VALUES - """ + ",".join([ - f"(:{i}_unit_name, :{i}_unit_type, :{i}_tool_name, :{i}_tool_type, :{i}_ip_centralina, :{i}_ip_gateway, :{i}_event_timestamp, :{i}_battery_level, :{i}_temperature, :{i}_nodes_jsonb)" - for i in range(len(records)) - ]) + """ - ON CONFLICT ON CONSTRAINT dataraw_unique - DO UPDATE SET - unit_type = EXCLUDED.unit_type, - tool_type = EXCLUDED.tool_type, - ip_centralina = EXCLUDED.ip_centralina, - ip_gateway = EXCLUDED.ip_gateway, - battery_level = EXCLUDED.battery_level, - temperature = EXCLUDED.temperature, - nodes_jsonb = EXCLUDED.nodes_jsonb; - """), {f"{i}_{key}": value for i, record in enumerate(records) for key, value in record.items()}) +def write_db(records, cfg): + insert_values = [ + ( + record["unit_name"], record["unit_type"], record["tool_name"], record["tool_type"], + record["unit_ip"], record["unit_subnet"], record["unit_gateway"], record["event_timestamp"], + record["battery_level"], record["temperature"], record["nodes_jsonb"] + ) + for record in records + ] - conn.commit() + query = f""" + INSERT INTO {cfg.dbschema}.{cfg.dbdataraw} ( + unit_name, unit_type, tool_name, tool_type, unit_ip, unit_subnet, unit_gateway, + event_timestamp, battery_level, temperature, nodes_jsonb + ) + VALUES %s + ON CONFLICT ON CONSTRAINT dataraw_unique + DO UPDATE SET + unit_type = EXCLUDED.unit_type, + tool_type = EXCLUDED.tool_type, + unit_ip = EXCLUDED.unit_ip, + unit_subnet = EXCLUDED.unit_subnet, + unit_gateway = EXCLUDED.unit_gateway, + battery_level = EXCLUDED.battery_level, + temperature = EXCLUDED.temperature, + nodes_jsonb = EXCLUDED.nodes_jsonb; + """ -def elab_csv(engine, cfg): - with engine.connect() as conn: - cur = conn.cursor() - cur.execute(f'select unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbrectable } r ') + try: + with conn_db(cfg) as conn: + conn.autocommit = True + with conn.cursor() as cur: + try: + execute_values(cur, query, insert_values) + cur.close() + conn.commit() + except psycopg2.Error as e: + logging.error(f'Records not inserted: {e}') + logging.info(f'Exit') + exit() + except Exception as e: + logging.error(f'Records not inserted: {e}') + exit() - unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() - data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') - - # Estrarre le informazioni degli ip dalla header - infos = extract_value(cfg.csv_infos, data_list[:9]) +def elab_csv(cfg): + try: + with conn_db(cfg) as conn: + cur = conn.cursor() + cur.execute(f'select id, unit_name, unit_type, tool_name, tool_type, tool_data from {cfg.dbschema}.{cfg.dbrectable} where locked = 0 and status = 0') + id, unit_name, unit_type, tool_name, tool_type, tool_data = cur.fetchone() + cur.execute(f'update {cfg.dbschema}.{cfg.dbrectable} set locked = 1 where id = {id}') + data_list = str(tool_data).strip("('{\"").strip("\"}\',)").split('","') + # Estrarre le informazioni degli ip dalla header + infos = extract_value(cfg.csv_infos, str(data_list[:9])) + except Exception as e: + logging.error(f'{e}') records = [] - # Elabora le righe dei dati a partire dalla riga 8 in poi - for line in data_list: - if ";|;" in line: - # Rimuovi spazi bianchi o caratteri di nuova riga - input_data = line.strip().replace('\\n', '') + # Definizione dei pattern + timestamp_pattern1 = r'(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2});' + timestamp_pattern2 = r'(\d{2}/\d{2}/\d{4} \d{2}:\d{2}:\d{2});' - # Suddividi la stringa in sezioni usando ";|;" come separatore - parts = input_data.split(';|;') + # Formato desiderato per il timestamp + output_format = "%Y-%m-%d %H:%M:%S" - # Verifica che ci siano almeno tre parti (timestamp, misure e nodi) - if len(parts) < 3: - print(f"Riga non valida: {input_data}") - continue + for line in list(set(data_list)): + if (match := re.search(timestamp_pattern1, line)): + timestamp = datetime.strptime(match.group(1), "%Y/%m/%d %H:%M:%S").strftime(output_format) + elif (match := re.search(timestamp_pattern2, line)): + timestamp = datetime.strptime(match.group(1), "%d/%m/%Y %H:%M:%S").strftime(output_format) + else: + continue - # Estrai la data/ora e le prime misurazioni - timestamp = parts[0] - measurements = parts[1] + line_without_timestamp = (line[match.end():]).strip('|;') - # Estrai i valori di ciascun nodo e formatta i dati come JSON - nodes = parts[2:] - node_list = [] - for i, node_data in enumerate(nodes, start=1): - node_dict = {"num": i} - # Dividi ogni nodo in valori separati da ";" - node_values = node_data.split(';') - for j, value in enumerate(node_values, start=0): - # Imposta i valori a -9999 se trovi "Dis." - node_dict['val' + str(j)] = -9999 if value == "Dis." else float(value) - node_list.append(node_dict) + match_values = re.findall(r'[-+]?\d*\.\d+|\d+', line_without_timestamp) + battery_level, temperature = match_values[0], match_values[1] + remainder = ";".join(line_without_timestamp.split(";")[2:]).strip('|;') - # Prepara i dati per l'inserimento/aggiornamento - record = { - "unit_name": unit_name.upper(), - "unit_type": unit_type.upper(), - "tool_name": tool_name.upper(), - "tool_type": tool_type.upper(), - "ip_centralina": infos['IP'], - "ip_subnet": infos['Subnet'], - "ip_gateway": infos['Gateway'], - "Web_port": infos['Web port'], - "Ftp_port": infos['Ftp port'], - "event_timestamp": timestamp, - "battery_level": float(measurements.split(';')[0]), - "temperature": float(measurements.split(';')[1]), - "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON - } + # Rimuovi spazi bianchi o caratteri di nuova riga + nodes = remainder.strip().replace('\\n', '').split(";|;") - records.append(record) + # Estrai i valori di ciascun nodo e formatta i dati come JSON + node_list = [] + for i, node_data in enumerate(nodes, start=1): + node_dict = {"num": i} + # Dividi ogni nodo in valori separati da ";" + node_values = node_data.split(';') + for j, value in enumerate(node_values, start=0): + # Imposta i valori a -9999 se trovi "Dis." + node_dict['val' + str(j)] = -9999 if (value == "Dis." or value == "Err1" or value == "Err2" or value == "---" or value == "NotAv" or value == "No RX" or value == "DMUXe" or value == "CH n. Error" or value == "-") else float(value) + node_list.append(node_dict) - # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch - if len(records) >= 500: - print("raggiunti 500 record scrivo sul db") - write_db(engine, records) - records = [] + # Prepara i dati per l'inserimento/aggiornamento + record = { + "unit_name": unit_name.upper(), + "unit_type": unit_type.upper(), + "tool_name": tool_name.upper(), + "tool_type": tool_type.upper(), + "unit_ip": infos['IP'], + "unit_subnet": infos['Subnet'], + "unit_gateway": infos['Gateway'], + "event_timestamp": timestamp, + "battery_level": float(battery_level), + "temperature": float(temperature), + "nodes_jsonb": json.dumps(node_list) # Converti la lista di dizionari in una stringa JSON + } - write_db(engine, records) + records.append(record) + + # Se abbiamo raggiunto 500 record, esegui l'inserimento in batch + if len(records) >= 500: + logging.info("Raggiunti 500 record scrivo sul DB") + write_db(records, cfg) + records = [] + write_db(records, cfg) def main(): @@ -123,14 +154,13 @@ def main(): try: # Configura la connessione al database PostgreSQL - engine = create_engine(f'postgresql://{cfg.dbuser}:{cfg.dbpass}@{cfg.dbhost}:{cfg.dbport}/{cfg.dbschema}') # Configure logging logging.basicConfig( format="%(asctime)s %(message)s", - filename=cfg.logfilename, + filename=cfg.elablog, level=logging.INFO, ) - elab_csv(engine, cfg) + elab_csv(cfg) except KeyboardInterrupt: logging.info( diff --git a/utils/config/set_config.py b/utils/config/set_config.py index cbfe2a9..54c7549 100644 --- a/utils/config/set_config.py +++ b/utils/config/set_config.py @@ -36,6 +36,7 @@ class config: self.dbschema = c.get("db", "dbSchema") self.dbusertable = c.get("db", "userTableName") self.dbrectable = c.get("db", "recTableName") + self.dbdataraw = c.get("db", "rawTableName") # unit setting self.units_name = [part for part in c.get("unit", "Names").split('|')]